Author: kwall
Date: Tue Sep 11 22:19:00 2012
New Revision: 1383638
URL: http://svn.apache.org/viewvc?rev=1383638&view=rev
Log:
QPID-4296: Push down 0-8..0-9-1 flow control implementation to AMQSession_0_8
(refactoring)
Move method impls. isFlowBlocked(), setFlowBlocked() and checkFlowBlocked() and
their associated fields
down to AMQSession_0_8.
On 0-10, isFlowBlocked() was already overridden to delegate to the 0-10
transport layer.
The operation setFlowBlocked() makes no sense to 0-10, so will be implemented
to throw
UnsupportedOperationException.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1383638&r1=1383637&r2=1383638&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Sep 11 22:19:00 2012
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client;
-import static
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
-import static
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-import static
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
-import static
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,19 +115,6 @@ public abstract class AMQSession<C exten
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- /**
- * The period to wait while flow controlled before sending a log message
confirming that the session is still
- * waiting on flow control being revoked
- */
- private final long _flowControlWaitPeriod =
Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
-
DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
-
- /**
- * The period to wait while flow controlled before declaring a failure
- */
- private final long _flowControlWaitFailure =
Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
-
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
private final boolean _delareQueues =
Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME,
"true"));
@@ -263,11 +245,6 @@ public abstract class AMQSession<C exten
/** Has failover occured on this session with outstanding actions to
commit? */
private boolean _failedOverDirty;
- /** Flow control */
- private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
-
-
/** Holds the highest received delivery tag. */
protected AtomicLong getHighestDeliveryTag()
{
@@ -406,22 +383,6 @@ public abstract class AMQSession<C exten
}
}
- private static final class FlowControlIndicator
- {
- private volatile boolean _flowControl = true;
-
- public synchronized void setFlowControl(boolean flowControl)
- {
- _flowControl = flowControl;
- notify();
- }
-
- public boolean getFlowControl()
- {
- return _flowControl;
- }
- }
-
/**
* Creates a new session on a connection.
*
@@ -3087,47 +3048,14 @@ public abstract class AMQSession<C exten
_ticket = ticket;
}
- public boolean isFlowBlocked()
- {
- synchronized (_flowControl)
- {
- return !_flowControl.getFlowControl();
- }
- }
-
- public void setFlowControl(final boolean active)
- {
- _flowControl.setFlowControl(active);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Broker enforced flow control " + (active ? "no
longer in effect" : "has been enforced"));
- }
- }
-
- public void checkFlowControl() throws InterruptedException, JMSException
- {
- long expiryTime = 0L;
- synchronized (_flowControl)
- {
- while (!_flowControl.getFlowControl() &&
- (expiryTime == 0L ? (expiryTime =
System.currentTimeMillis() + _flowControlWaitFailure)
- : expiryTime) >=
System.currentTimeMillis() )
- {
-
- _flowControl.wait(_flowControlWaitPeriod);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Message send delayed by " +
(System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s
due to broker enforced flow control");
- }
- }
- if(!_flowControl.getFlowControl())
- {
- _logger.error("Message send failed due to timeout waiting on
broker enforced flow control");
- throw new JMSException("Unable to send message for " +
_flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
- }
- }
+ /**
+ * Tests whether flow to this session is blocked.
+ *
+ * @return true if flow is blocked or false otherwise.
+ */
+ public abstract boolean isFlowBlocked();
- }
+ public abstract void setFlowControl(final boolean active);
public interface Dispatchable
{
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1383638&r1=1383637&r2=1383638&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Sep 11 22:19:00 2012
@@ -1410,6 +1410,13 @@ public class AMQSession_0_10 extends AMQ
return _qpidSession.isFlowBlocked();
}
+ @Override
+ public void setFlowControl(boolean active)
+ {
+ // Supported by 0-8..0-9-1 only
+ throw new UnsupportedOperationException("Operation not supported by
this protocol");
+ }
+
private void cancelTimerTask()
{
if (flushTask != null)
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1383638&r1=1383637&r2=1383638&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Tue Sep 11 22:19:00 2012
@@ -21,6 +21,11 @@
package org.apache.qpid.client;
+import static
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static
org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static
org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +65,22 @@ public class AMQSession_0_8 extends AMQS
private static final Logger _logger =
LoggerFactory.getLogger(AMQSession.class);
/**
+ * The period to wait while flow controlled before sending a log message
confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ private final long _flowControlWaitPeriod =
Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+
DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ private final long _flowControlWaitFailure =
Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
+
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
+ /**
* Creates a new session on a connection.
*
* @param con The connection on which to create the
session.
@@ -728,6 +749,49 @@ public class AMQSession_0_8 extends AMQS
}
}
+ public boolean isFlowBlocked()
+ {
+ synchronized (_flowControl)
+ {
+ return !_flowControl.getFlowControl();
+ }
+ }
+
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Broker enforced flow control " + (active ? "no
longer in effect" : "has been enforced"));
+ }
+ }
+
+ void checkFlowControl() throws InterruptedException, JMSException
+ {
+ long expiryTime = 0L;
+ synchronized (_flowControl)
+ {
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime =
System.currentTimeMillis() + _flowControlWaitFailure)
+ : expiryTime) >=
System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(_flowControlWaitPeriod);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Message send delayed by " +
(System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s
due to broker enforced flow control");
+ }
+ }
+ if(!_flowControl.getFlowControl())
+ {
+ _logger.error("Message send failed due to timeout waiting on
broker enforced flow control");
+ throw new JMSException("Unable to send message for " +
_flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
+ }
+ }
+ }
+
+
+
public abstract static class DestinationCache<T extends AMQDestination>
{
private final Map<AMQShortString, Map<AMQShortString, T>> cache = new
HashMap<AMQShortString, Map<AMQShortString, T>>();
@@ -775,6 +839,22 @@ public class AMQSession_0_8 extends AMQS
}
}
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl = flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
private final TopicDestinationCache _topicDestinationCache = new
TopicDestinationCache();
private final QueueDestinationCache _queueDestinationCache = new
QueueDestinationCache();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]