Author: robbie Date: Tue May 29 11:39:25 2012 New Revision: 1343680 URL: http://svn.apache.org/viewvc?rev=1343680&view=rev Log: QPID-3986, QPID-4009, QPID-4017: add constants for system properties/defaults. Update default values for flow control timeouts to be consistent between 0-8/9/9-1 and 0-10 client paths (60sec). Increase the 'failover method timeout' for 0-8/9/9-1 client path to 120sec. Update documentation accordingly.
Work by Philip Harvey <p...@philharveyonline.com> and myself, based on review feedback. Modified: qpid/trunk/qpid/doc/book/src/java-broker/Producer-Flow-Control.xml qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Modified: qpid/trunk/qpid/doc/book/src/java-broker/Producer-Flow-Control.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/java-broker/Producer-Flow-Control.xml?rev=1343680&r1=1343679&r2=1343680&view=diff ============================================================================== --- qpid/trunk/qpid/doc/book/src/java-broker/Producer-Flow-Control.xml (original) +++ qpid/trunk/qpid/doc/book/src/java-broker/Producer-Flow-Control.xml Tue May 29 11:39:25 2012 @@ -26,9 +26,10 @@ <section role="h2" id="QpidProducerFlowControlGeneralInformation"> <title>General Information</title> <para> - The Qpid 0.6 release introduces a simplistic producer-side flow control mechanism + The Qpid 0.6 release introduced a simplistic producer-side flow control mechanism into the Java Messaging Broker, causing producers to be flow-controlled when they - attempt to send messages to an overfull queue or overfull message store on a virtual host. + attempt to send messages to an overfull queue. Qpid 0.18 introduced a similar + mechanism triggered by an overfull persistent message store on a virtual host. </para> </section> <section role="h2" id="QpidProducerFlowControlServerConfiguration"> @@ -118,10 +119,10 @@ MESSAGE [con:2(guest@anonymous(713889609 Two limits can be configured: </para> <para> - overflow limit - the maximum space on disk (in bytes) which can be used by store. + overfull limit - the maximum space on disk (in bytes) which can be used by store. </para> <para> - underflow limit - when the space on disk drops below this limit, producers are allowed to resume publishing. + underfull limit - when the space on disk drops below this limit, producers are allowed to resume publishing. </para> <para> An example of quota configuration for the BDB message store is provided below. @@ -173,16 +174,12 @@ MESSAGE [con:2(guest@anonymous(713889609 While blocking the client will periodically log the fact that it is blocked waiting on flow control. </para> <programlisting> -WARN AMQSession - Broker enforced flow control has been enforced -WARN AMQSession - Message send delayed by 5s due to broker enforced flow control -WARN AMQSession - Message send delayed by 10s due to broker enforced flow control +WARN Message send delayed by 5s due to broker enforced flow control +WARN Message send delayed by 10s due to broker enforced flow control </programlisting> <para> After a set period the send will timeout and throw a JMSException to the calling code. </para> - <programlisting> -ERROR AMQSession - Message send failed due to timeout waiting on broker enforced flow control - </programlisting> <para> If such a JMSException is thrown, the message will not be sent to the broker, however the underlying Session may still be active - in particular if the @@ -196,13 +193,11 @@ ERROR AMQSession - Message send failed d </para> <para> The amount of time (in milliseconds) to wait before timing out - is controlled by the property qpid.flow_control_wait_failure - (the default is 120000 - which is two minutes). + is controlled by the property qpid.flow_control_wait_failure. </para> <para> The frequency at which the log message informing that the producer is flow controlled is sent is controlled by the system property qpid.flow_control_wait_notify_period. - The default value is 5000 milliseconds (i.e. 5 seconds). </para> <para> Adding the following to the command line to start the client would result in a timeout of one minute, @@ -213,32 +208,9 @@ ERROR AMQSession - Message send failed d -Dqpid.flow_control_wait_notify_period=10000 </programlisting> <section role="h3"> - <title>Using failover with flow control</title> - <para> - If the flow control feature is used with failover functionality, and connection is lost when producer - is blocked by the flow control, failover is not started immediately. Failover only proceeds after - the flow control wait interval is expired. - </para> - <para> - However, by default, failover has a shorter timeout for re-establishing the connection than the flow control timeout - (one minute and two minutes respectively), therefore a long-running flow control situation could cause failover to fail. - </para> - <para> - In order to avoid this from happening the failover timeout can be set to the value bigger - than flow control wait interval. To set the failover timeout the JVM setting - qpid.failover_method_timeout can be used as follows - </para> - <programlisting> --Dqpid.failover_method_timeout=180000 - </programlisting> - <para> - Setting the flow wait interval to a value lower than the failover timeout would allow failover to restore connectivity. - </para> - </section> <!-- Using failover --> - <section role="h3"> <title>Older Clients</title> <para> - This feature was added for the 0.6 releaase of the Java Broker. If an older client connects to the broker then the flow control commands will be ignored and they will not be blocked. So to fully benefit from this new feature both Client and Broker need to be at least version 0.6. + The flow control feature was first added to the Java broker/client in the 0.6 release. If an older client connects to the broker then the flow control commands will be ignored by it and it will not be blocked. So to fully benefit from this feature both Client and Broker need to be at least version 0.6. </para> </section> </section> <!-- Client impact and configuration --> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1343680&r1=1343679&r2=1343680&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue May 29 11:39:25 2012 @@ -20,6 +20,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,18 +122,17 @@ public abstract class AMQSession<C exten /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ - private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); /** * The period to wait while flow controlled before declaring a failure */ - private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", + private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, DEFAULT_FLOW_CONTROL_WAIT_FAILURE); private final boolean _delareQueues = Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java?rev=1343680&r1=1343679&r2=1343680&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java Tue May 29 11:39:25 2012 @@ -33,9 +33,7 @@ public class FailoverPolicy { private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class); - private static final long MINUTE = 60000L; - - private final long DEFAULT_METHOD_TIMEOUT = Long.getLong("qpid.failover_method_timeout", 1 * MINUTE); + private final long DEFAULT_METHOD_TIMEOUT = Long.getLong("qpid.failover_method_timeout", 120000); private FailoverMethod[] _methods = new FailoverMethod[1]; Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1343680&r1=1343679&r2=1343680&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Tue May 29 11:39:25 2012 @@ -168,4 +168,28 @@ public class ClientProperties public static final String SEND_BUFFER_SIZE_PROP_NAME = "qpid.send_buffer_size"; @Deprecated public static final String LEGACY_SEND_BUFFER_SIZE_PROP_NAME = "amqj.sendBufferSize"; + + /** + * System property to set the time (in millis) to wait before failing when sending and + * the client has been flow controlled by the broker. + */ + public static final String QPID_FLOW_CONTROL_WAIT_FAILURE = "qpid.flow_control_wait_failure"; + + /** + * Default time (in millis) to wait before failing when sending and the client has been + * flow controlled by the broker. + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 60000L; + + /** + * System property to set the time (in millis) between log notifications that a + * send is waiting because the client was flow controlled by the broker. + */ + public static final String QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = "qpid.flow_control_wait_notify_period"; + + /** + * Default time (in millis) between log notifications that a send is + * waiting because the client was flow controlled by the broker. + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = 5000L; } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1343680&r1=1343679&r2=1343680&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue May 29 11:39:25 2012 @@ -94,8 +94,10 @@ public class Session extends SessionInvo private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); - private final long blockedSendTimeout = Long.getLong("qpid.flow_control_wait_failure", timeout); - private long blockedSendReportingPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long blockedSendTimeout = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE, + ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + private long blockedSendReportingPeriod = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); private boolean autoSync = false; @@ -215,12 +217,6 @@ public class Session extends SessionInvo return this.state; } - public boolean isFlowControlled() - { - return flowControl; - } - - void setFlowControl(boolean value) { flowControl = value; @@ -1201,6 +1197,6 @@ public class Session extends SessionInvo */ public boolean isFlowBlocked() { - return isFlowControlled() && credit.availablePermits() == 0; + return flowControl && credit.availablePermits() == 0; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org