Author: kwall
Date: Tue Sep 11 22:19:00 2012
New Revision: 1383638

URL: http://svn.apache.org/viewvc?rev=1383638&view=rev
Log:
QPID-4296: Push down 0-8..0-9-1 flow control implementation to AMQSession_0_8 
(refactoring)

Move method impls. isFlowBlocked(), setFlowBlocked() and checkFlowBlocked() and 
their associated fields
down to AMQSession_0_8.

On 0-10, isFlowBlocked() was already overridden to delegate to the 0-10 
transport layer.
The operation setFlowBlocked() makes no sense to 0-10, so will be implemented 
to throw
UnsupportedOperationException.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1383638&r1=1383637&r2=1383638&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Sep 11 22:19:00 2012
@@ -20,11 +20,6 @@
  */
 package org.apache.qpid.client;
 
-import static 
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
-import static 
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-import static 
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
-import static 
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,19 +115,6 @@ public abstract class AMQSession<C exten
     /** Immediate message prefetch default. */
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
-    /**
-     * The period to wait while flow controlled before sending a log message 
confirming that the session is still
-     * waiting on flow control being revoked
-     */
-    private final long _flowControlWaitPeriod = 
Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
-                                                                 
DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
-
-    /**
-     * The period to wait while flow controlled before declaring a failure
-     */
-    private final long _flowControlWaitFailure = 
Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
-                                                                  
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
     private final boolean _delareQueues =
         
Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME,
 "true"));
 
@@ -263,11 +245,6 @@ public abstract class AMQSession<C exten
     /** Has failover occured on this session with outstanding actions to 
commit? */
     private boolean _failedOverDirty;
 
-    /** Flow control */
-    private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
-
-
     /** Holds the highest received delivery tag. */
     protected AtomicLong getHighestDeliveryTag()
     {
@@ -406,22 +383,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    private static final class FlowControlIndicator
-    {
-        private volatile boolean _flowControl = true;
-
-        public synchronized void setFlowControl(boolean flowControl)
-        {
-            _flowControl = flowControl;
-            notify();
-        }
-
-        public boolean getFlowControl()
-        {
-            return _flowControl;
-        }
-    }
-
     /**
      * Creates a new session on a connection.
      *
@@ -3087,47 +3048,14 @@ public abstract class AMQSession<C exten
         _ticket = ticket;
     }
 
-    public boolean isFlowBlocked()
-    {
-        synchronized (_flowControl)
-        {
-            return !_flowControl.getFlowControl();
-        }
-    }
-
-    public void setFlowControl(final boolean active)
-    {
-        _flowControl.setFlowControl(active);
-        if (_logger.isInfoEnabled())
-        {
-            _logger.info("Broker enforced flow control " + (active ? "no 
longer in effect" : "has been enforced"));
-        }
-    }
-
-    public void checkFlowControl() throws InterruptedException, JMSException
-    {
-        long expiryTime = 0L;
-        synchronized (_flowControl)
-        {
-            while (!_flowControl.getFlowControl() &&
-                   (expiryTime == 0L ? (expiryTime = 
System.currentTimeMillis() + _flowControlWaitFailure)
-                                     : expiryTime) >= 
System.currentTimeMillis() )
-            {
-
-                _flowControl.wait(_flowControlWaitPeriod);
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Message send delayed by " + 
(System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s 
due to broker enforced flow control");
-                }
-            }
-            if(!_flowControl.getFlowControl())
-            {
-                _logger.error("Message send failed due to timeout waiting on 
broker enforced flow control");
-                throw new JMSException("Unable to send message for " + 
_flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
-            }
-        }
+    /**
+     * Tests whether flow to this session is blocked.
+     *
+     * @return true if flow is blocked or false otherwise.
+     */
+    public abstract boolean isFlowBlocked();
 
-    }
+    public abstract void setFlowControl(final boolean active);
 
     public interface Dispatchable
     {

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1383638&r1=1383637&r2=1383638&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Tue Sep 11 22:19:00 2012
@@ -1410,6 +1410,13 @@ public class AMQSession_0_10 extends AMQ
         return _qpidSession.isFlowBlocked();
     }
 
+    @Override
+    public void setFlowControl(boolean active)
+    {
+        // Supported by 0-8..0-9-1 only
+        throw new UnsupportedOperationException("Operation not supported by 
this protocol");
+    }
+
     private void cancelTimerTask()
     {
         if (flushTask != null)

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1383638&r1=1383637&r2=1383638&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Tue Sep 11 22:19:00 2012
@@ -21,6 +21,11 @@
 package org.apache.qpid.client;
 
 
+import static 
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static 
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static 
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static 
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +65,22 @@ public class AMQSession_0_8 extends AMQS
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQSession.class);
 
     /**
+     * The period to wait while flow controlled before sending a log message 
confirming that the session is still
+     * waiting on flow control being revoked
+     */
+    private final long _flowControlWaitPeriod = 
Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+                                                                 
DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+
+    /**
+     * The period to wait while flow controlled before declaring a failure
+     */
+    private final long _flowControlWaitFailure = 
Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
+                                                                  
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+    /** Flow control */
+    private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
+    /**
      * Creates a new session on a connection.
      *
      * @param con                     The connection on which to create the 
session.
@@ -728,6 +749,49 @@ public class AMQSession_0_8 extends AMQS
         }
     }
 
+    public boolean isFlowBlocked()
+    {
+        synchronized (_flowControl)
+        {
+            return !_flowControl.getFlowControl();
+        }
+    }
+
+    public void setFlowControl(final boolean active)
+    {
+        _flowControl.setFlowControl(active);
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Broker enforced flow control " + (active ? "no 
longer in effect" : "has been enforced"));
+        }
+    }
+
+    void checkFlowControl() throws InterruptedException, JMSException
+    {
+        long expiryTime = 0L;
+        synchronized (_flowControl)
+        {
+            while (!_flowControl.getFlowControl() &&
+                   (expiryTime == 0L ? (expiryTime = 
System.currentTimeMillis() + _flowControlWaitFailure)
+                                     : expiryTime) >= 
System.currentTimeMillis() )
+            {
+
+                _flowControl.wait(_flowControlWaitPeriod);
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Message send delayed by " + 
(System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s 
due to broker enforced flow control");
+                }
+            }
+            if(!_flowControl.getFlowControl())
+            {
+                _logger.error("Message send failed due to timeout waiting on 
broker enforced flow control");
+                throw new JMSException("Unable to send message for " + 
_flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
+            }
+        }
+    }
+
+
+
     public abstract static class DestinationCache<T extends AMQDestination>
     {
         private final Map<AMQShortString, Map<AMQShortString, T>> cache = new 
HashMap<AMQShortString, Map<AMQShortString, T>>();
@@ -775,6 +839,22 @@ public class AMQSession_0_8 extends AMQS
         }
     }
 
+    private static final class FlowControlIndicator
+    {
+        private volatile boolean _flowControl = true;
+
+        public synchronized void setFlowControl(boolean flowControl)
+        {
+            _flowControl = flowControl;
+            notify();
+        }
+
+        public boolean getFlowControl()
+        {
+            return _flowControl;
+        }
+    }
+
     private final TopicDestinationCache _topicDestinationCache = new 
TopicDestinationCache();
     private final QueueDestinationCache _queueDestinationCache = new 
QueueDestinationCache();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to