[
https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gary Tully resolved AMQ-3331.
-----------------------------
Resolution: Fixed
Variant of patch applied with thanks in:
http://svn.apache.org/viewvc?rev=1186813&view=rev
Added boolean attribute, {{alwaysSyncSend}} to the network connector. This
allows the behavior for persistent messages to be applied to non-persistent
messages.
It does not make sense to have an 'always Async' mode as this could lead to
lost persistent messages. As a result then need for the enum goes away.
Also, I agree with the assessment of the use of the responseRequired flag in
the choice of sync/async send,
it should be based on the persistence attribute of the message, I have fixed
that.
This is vital such that messages sent in transactions are not sent async.
Thanks for the great test case, made life much easier.
> When a producer from a network bridge is blocked by producer flow control,
> all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-3331
> URL: https://issues.apache.org/jira/browse/AMQ-3331
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker, Test Cases, Transport
> Affects Versions: 5.5.0
> Reporter: Stirling Chow
> Assignee: Gary Tully
> Fix For: 5.6.0
>
> Attachments: AMQ-3331.patch,
> NetworkBridgeProducerFlowControlPrePatchTest.java,
> NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2. Broker B consumes
> messages from two queues, Q1 and Q2. Broker A is connected by a demand
> forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2
> will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2
> consumer is slow), and this triggers producer flow control to halt new
> messages being sent to Broker B's Q2 over the bridge. Broker A's instances
> of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour
> over the course of the production of 1000 messages to both Q1/Q2, where
> Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A Bridge Broker B
> ======== ========
> 0->1000->0 ------> 0->1000->...
> 0->1000->500 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are
> what we expected, namely:
> # Broker A produces 1000 messages to Q1 without blocking and all of these
> messages are forwarded to Broker B's Q1 without blocking, eventually being
> consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these
> messages are forwarded to Broker B's Q2 before producer flow control blocks
> the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e.,
> producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this
> behaviour:
> {noformat}
> Broker A Bridge Broker B
> ======== ========
> 0->1000->500 ------> 0->500->...
> 0->1000->500 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are
> not what we expected, namely: producer flow control on Broker B's instance of
> Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers
> producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the
> cause), that peristent messages should behave better than non-persistent
> messages. The same difference in behaviour can also be observed with
> persistent messages if Broker A these outside a JMS transaction (e.g.,
> AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves
> appropriately, with Q1 independent of Q2, but inside behaves the same as the
> non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding
> producer flow control: {quote}As of ActiveMQ 5.0, we can now individually
> flow control each producer on a shared connection without having to suspend
> the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and
> transactionaly/non-transactional) is due to the two ways that
> org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else
> if (memoryUsage.isFull()) {
> waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full.
> Producer ("
> + message.getProducerId() + ") stopped to
> prevent flooding "
> + getActiveMQDestination().getQualifiedName()
> + "."
> + " See
> http://activemq.apache.org/producer-flow-control.html for more info");
> }
> {code}
> There is only a single transport thread that services the TCP socket on
> Broker B. This TCP socket is the "remote" end of the A->B bridge and is
> responsible for *sequentially* enqueueing to Broker B's queues all messages
> from Broker A. When a non-persistent or transactional message is sent to
> Broker A's queues, it has +responseRequired=true+, which is preserved when
> the bridge forwards the message to Broker B's queues. If producer flow
> control is triggered on Broker B's queue, the first method of producer flow
> control will be used: the transport thread will not block, but the repsonse
> will be held back until the queue has room. As a result, the transport
> thread is free to continue enqueueing messages from the bridge, particularly
> those destined for queues that are not full (NOTE: since the network prefetch
> is 1 no new messages to the full queue will be forwarded until the response
> is returned).
> When a persistent or non-transactional message is sent to Broker A's queues,
> it has +responseRequired=false+, which is preserved when the bridge forwards
> the message to Broker B's queues. If producer flow control is triggered on
> Broker B's queue, the second method of producer flow control will be used:
> the transport thread will be blocked. As a result, no other messages from
> the bridge will be forwarded, even those destined for queues that are not
> full.
> The preservation of the {{responseRequired}} flag occurs in
> org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>
> // If the message was originally sent using async
> // send, we will preserve that QOS
> // by bridging it using an async send (small chance
> // of message loss).
> try {
> remoteBroker.oneway(message);
> localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
> 1));
> dequeueCounter.incrementAndGet();
> } finally {
> sub.decrementOutstandingResponses();
> }
>
> } else {
>
> // The message was not sent using async send, so we
> // should only ack the local
> // broker when we get confirmation that the remote
> // broker has received the message.
> ResponseCallback callback = new ResponseCallback() {
> public void onCompletion(FutureResponse future) {
> try {
> Response response = future.getResult();
> if (response.isException()) {
> ExceptionResponse er = (ExceptionResponse) response;
> serviceLocalException(er.getException());
> } else {
> localBroker.oneway(new MessageAck(md,
> MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> dequeueCounter.incrementAndGet();
> }
> } catch (IOException e) {
> serviceLocalException(e);
> } finally {
> sub.decrementOutstandingResponses();
> }
> }
> };
>
> remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of
> {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message,
> callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original
> {{responseRequired}} flag. Once the message is dispatched to the bridge for
> forwarding, the original producer ceases to care and not waiting for a
> response. Once a response is returned from the remote broker, it is only
> used to signal the message ACK so that the local broker so that the inflight
> and dequeue counts can be updated --- neither the response nor the ACK
> continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one
> queue can have a serious side effect (blocking the bridge completely), I
> think the best solution is to remove the logic from
> DemandForwardingBridgeSupport that takes into account
> {{message.isResponseRequired}} and simply always forwards the message with
> {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is
> removed), I've attached a patch that adds a {{remoteDispatchType}} field to
> org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses
> {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message,
> callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses
> {{remoteBroker.asyncRequest(message, callback);}} to forward all messages
> (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses
> {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for
> all combinations of persistent/non-persistent and {{remoteDispatchType}}.
> Pay particular note to the final assertions in the unit test -- the test is
> designed to pass as-is by modifying the expectations to validate the bad
> behaviour when necessary. Ideally, the bad behaviour should cause test case
> failure if you feel that blocking the entire network bridge is a bug.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira