[ 
https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Tully resolved AMQ-3331.
-----------------------------

    Resolution: Fixed

Variant of patch applied with thanks in: 
http://svn.apache.org/viewvc?rev=1186813&view=rev

Added boolean attribute, {{alwaysSyncSend}} to the network connector. This 
allows the behavior for persistent messages to be applied to non-persistent 
messages.
It does not make sense to have an 'always Async' mode as this could lead to 
lost persistent messages. As a result then need for the enum goes away.

Also, I agree with the assessment of the use of the responseRequired flag in 
the choice of sync/async send, 
it should be based on the persistence attribute of the message, I have fixed 
that.
This is vital such that messages sent in transactions are not sent async.

Thanks for the great test case, made life much easier.
                
> When a producer from a network bridge is blocked by producer flow control, 
> all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>            Assignee: Gary Tully
>             Fix For: 5.6.0
>
>         Attachments: AMQ-3331.patch, 
> NetworkBridgeProducerFlowControlPrePatchTest.java, 
> NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes 
> messages from two queues, Q1 and Q2.  Broker A is connected by a demand 
> forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 
> will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 
> consumer is slow), and this triggers producer flow control to halt new 
> messages being sent to Broker B's Q2 over the bridge.  Broker A's instances 
> of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour 
> over the course of the production of 1000 messages to both Q1/Q2, where 
> Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are 
> what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these 
> messages are forwarded to Broker B's Q1 without blocking, eventually being 
> consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these 
> messages are forwarded to Broker B's Q2 before producer flow control blocks 
> the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., 
> producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this 
> behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are 
> not what we expected, namely: producer flow control on Broker B's instance of 
> Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers 
> producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the 
> cause), that peristent messages should behave better than non-persistent 
> messages.  The same difference in behaviour can also be observed with 
> persistent messages if Broker A these outside a JMS transaction (e.g., 
> AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves 
> appropriately, with Q1 independent of Q2, but inside behaves the same as the 
> non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding 
> producer flow control: {quote}As of ActiveMQ 5.0, we can now individually 
> flow control each producer on a shared connection without having to suspend 
> the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and 
> transactionaly/non-transactional) is due to the two ways that 
> org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. 
> Producer ("
>                                 + message.getProducerId() + ") stopped to 
> prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() 
> + "."
>                                 + " See 
> http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on 
> Broker B.  This TCP socket is the "remote" end of the A->B bridge and is 
> responsible for *sequentially* enqueueing to Broker B's queues all messages 
> from Broker A.  When a non-persistent or transactional message is sent to 
> Broker A's queues, it has +responseRequired=true+, which is preserved when 
> the bridge forwards the message to Broker B's queues.  If producer flow 
> control is triggered on Broker B's queue, the first method of producer flow 
> control will be used: the transport thread will not block, but the repsonse 
> will be held back until the queue has room.  As a result, the transport 
> thread is free to continue enqueueing messages from the bridge, particularly 
> those destined for queues that are not full (NOTE: since the network prefetch 
> is 1 no new messages to the full queue will be forwarded until the response 
> is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, 
> it has +responseRequired=false+, which is preserved when the bridge forwards 
> the message to Broker B's queues.  If producer flow control is triggered on 
> Broker B's queue, the second method of producer flow control will be used: 
> the transport thread will be blocked.  As a result, no other messages from 
> the bridge will be forwarded, even those destined for queues that are not 
> full.
> The preservation of the {{responseRequired}} flag occurs in 
> org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 
> 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, 
> MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of 
> {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, 
> callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original 
> {{responseRequired}} flag.  Once the message is dispatched to the bridge for 
> forwarding, the original producer ceases to care and not waiting for a 
> response.  Once a response is returned from the remote broker, it is only 
> used to signal the message ACK so that the local broker so that the inflight 
> and dequeue counts can be updated --- neither the response nor the ACK 
> continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one 
> queue can have a serious side effect (blocking the bridge completely), I 
> think the best solution is to remove the logic from 
> DemandForwardingBridgeSupport that takes into account 
> {{message.isResponseRequired}} and simply always forwards the message with 
> {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is 
> removed), I've attached a patch that adds a {{remoteDispatchType}} field to 
> org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses 
> {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, 
> callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses 
> {{remoteBroker.asyncRequest(message, callback);}} to forward all messages 
> (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses 
> {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for 
> all combinations of persistent/non-persistent and {{remoteDispatchType}}.  
> Pay particular note to the final assertions in the unit test -- the test is 
> designed to pass as-is by modifying the expectations to validate the bad 
> behaviour when necessary.  Ideally, the bad behaviour should cause test case 
> failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to