[ 
https://issues.apache.org/jira/browse/AMQ-4138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stirling Chow updated AMQ-4138:
-------------------------------

    Attachment: BridgeMemoryLimitDeadlockTest.txt

Unit test that demonstrates the deadlock.  Since the test is based on a race 
condition, it is unlikely to fail unless TopicSubscription.acknowledge(...) is 
artifically delayed (see comments in ticket and attached TopicSubscription.java)
                
> Network bridges can deadlock when memory limit exceeded
> -------------------------------------------------------
>
>                 Key: AMQ-4138
>                 URL: https://issues.apache.org/jira/browse/AMQ-4138
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.7.0
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: BridgeMemoryLimitDeadlockTest.txt, TopicSubscription.java
>
>
> Symptom
> =======
> We have a network of 4 brokers that share messages using distributed queues 
> via demand forwarding bridges.  We were validating the behaviour of the 
> system when memory usage approached and exceeded the out-of-box memory limit 
> (64MB).
> We discovered that with great frequency the bridges would appear to stop 
> functioning --- no messages were being produced or consumed.  We've 
> experienced similar behaviour when producer flow control is activated, but in 
> our tests, we'd turned producer flow control off (both to avoid bridges 
> stalling due to producer flow control and so that we could produce enough 
> messages to exceed the memory limit).
> The system would never recover from this deadlock. 
> Cause
> =====
> We found a number of threads looping indefinitely with the following stack:
> {code}
> Daemon Thread [ActiveMQ VMTransport: vm://broker1#7-1] (Suspended)    
>       owns: Topic  (id=109)   
>       waiting for: Object  (id=110)   
>       Object.wait(long) line: not available [native method]   
>       TopicSubscription.add(MessageReference) line: 135       
>       SimpleDispatchPolicy.dispatch(MessageReference, 
> MessageEvaluationContext, List<Subscription>) line: 48  
>       Topic.dispatch(ConnectionContext, Message) line: 680    
>       Topic.doMessageSend(ProducerBrokerExchange, Message) line: 491  
>       Topic.send(ProducerBrokerExchange, Message) line: 427   
>       ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, 
> Message) line: 407      
>       ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message) 
> line: 503       
>       ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 311     
>       AdvisoryBroker.fireAdvisory(ConnectionContext, ActiveMQTopic, Command, 
> ConsumerId, ActiveMQMessage) line: 551   
>       AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
> ActiveMQDestination, ActiveMQTopic, Command, ConsumerId) line: 500       
>       AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
> ActiveMQDestination, ActiveMQTopic, Command) line: 486   
>       AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 98    
>       CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89  
>       TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89   
>       BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 95      
>       
> ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
>  line: 562      
>       ConsumerInfo.visit(CommandVisitor) line: 332    
>       ManagedTransportConnection(TransportConnection).service(Command) line: 
> 294      
>       TransportConnection$1.onCommand(Object) line: 152       
>       ResponseCorrelator.onCommand(Object) line: 116  
>       MutexTransport.onCommand(Object) line: 50       
>       VMTransport.iterate() line: 241 
>       PooledTaskRunner.runTask() line: 129    
>       PooledTaskRunner$1.run() line: 47       
>       ThreadPoolExecutor$Worker.runTask(Runnable) line: 886   
>       ThreadPoolExecutor$Worker.run() line: 908       
>       Thread.run() line: 662  
> {code}
> The spinning threads were associated with the VMTransport TaskRunner from 
> {{DemandForwardingBridgeSupport.localBroker}}.  Since the TaskRunner was 
> essentially blocked processing one message, all other messages being 
> forwarded from the remote end of the bridge (e.g., ACKs) were getting queued, 
> but not processed, which made the bridge appear to be stalled.
> The message being processed by the spinning thread was a ConsumerInfo 
> representing a demand subscription from the remote broker, and was being 
> forwarded to a subscription on the associated consumer advisory topic.  The 
> subscription was waiting for memory to become available in the 
> {{matchedListMutex}} loop:
> {code:title=TopicSubscription.java}
>     public void add(MessageReference node) throws Exception {
>         if (isDuplicate(node)) {
>             return;
>         }
>         enqueueCounter.incrementAndGet();
>         if (!isFull() && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages 
> which
>             // have not been dispatched (i.e. we allow the prefetch buffer to 
> be filled)
>             dispatch(node);
>             setSlowConsumer(false);
>         } else {
>             if (info.getPrefetchSize() > 1 && matched.size() > 
> info.getPrefetchSize()) {
>                 // Slow consumers should log and set their state as such.
>                 if (!isSlowConsumer()) {
>                     LOG.warn(toString() + ": has twice its prefetch limit 
> pending, without an ack; it appears to be slow");
>                     setSlowConsumer(true);
>                     for (Destination dest: destinations) {
>                         dest.slowConsumer(getContext(), this);
>                     }
>                 }
>             }
>             if (maximumPendingMessages != 0) {
>                 boolean warnedAboutWait = false;
>                 while (active) {
>                     synchronized (matchedListMutex) {
>                         while (matched.isFull()) {
>                             if (getContext().getStopping().get()) {
>                                 LOG.warn(toString() + ": stopped waiting for 
> space in pendingMessage cursor for: "
>                                         + node.getMessageId());
>                                 enqueueCounter.decrementAndGet();
>                                 return;
>                             }
>                             if (!warnedAboutWait) {
>                                 LOG.info(toString() + ": Pending message 
> cursor [" + matched
>                                         + "] is full, temp usage ("
>                                         + 
> +matched.getSystemUsage().getTempUsage().getPercentUsage()
>                                         + "%) or memory usage ("
>                                         + 
> matched.getSystemUsage().getMemoryUsage().getPercentUsage()
>                                         + "%) limit reached, blocking message 
> add() pending the release of resources.");
>                                 warnedAboutWait = true;
>                             }
>                             matchedListMutex.wait(20);
>                         }
>                         // Temporary storage could be full - so just try to 
> add the message
>                         // see 
> https://issues.apache.org/activemq/browse/AMQ-2475
>                         if (matched.tryAddMessageLast(node, 10)) {
>                             break;
>                         }
>                     }
>                 }
> {code}
> The {{matched.isFull()}} check that was keeping the thread in the loop is 
> only cleared once the memory usage is reduced:
> {code:title=AbstractPendingMessageCursor.java}
> public boolean isFull() {
>     return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : 
> false;
> }
> {code}
> Since the looping thread is essentially stalling the VMTransport on the local 
> side of the bridge, no dispatch ACKs can be processed for messages sent from 
> the local bridge to the remote broker.  If all consumers are on the remote 
> broker and ACKs are not being processed, then memory usage on the local 
> broker is never reduced, thus creating a deadlock.
> In order for {{TopicSubscription.add(...)}} to enter the (essentially 
> infinite) loop, the following conditions must *not* be met:
> {code:title=TopicSubscription.java}
> public void add(MessageReference node) throws Exception {
>         if (isDuplicate(node)) {
>             return;
>         }
>         enqueueCounter.incrementAndGet();
>         if (!isFull() && matched.isEmpty()  && !isSlave()) {
> {code}
> {{isFull()}} is true if the number of unacknowledged messages dispatched to 
> the subscription is greater than the subscription's prefetch size.  So for 
> the deadlock to occur, two things must happen:
> # There must be multiple consumers being added to the queue, thus generating 
> multiple adds to the consumer advisory topic subscription
> # There must be a delay in processing ACKs to the topic subscription so that 
> it becomes full
> For reasons to do with distribution management, our network connectors have a 
> prefetch size of 1, so under load, the deadlock occurs easily.
> I've attached a test case that clearly demonstrates the deadlock.  The test 
> case is simple:
> # Two brokers (broker1 and broker2) are bidirectionally bridged with a 
> network prefetch of 1
> # broker1 (with producer flow control disabled), produces enough messages to 
> a test queue so that the memory limit is exceeded
> # broker2 starts two consumers of the test queue, and the broker1->broker2 
> bridge forwards two demand subscriptions to broker1
> # broker1 processes the demand subscriptions and starts dispatching messages 
> to broker2
> # Since broker2 has a bridge back to broker1, broker1's processing of the 
> demand subscriptions generates two consumer advisory messages that are sent 
> over the consumer advisory topic to broker2 (of course, broker2 ignores them 
> since they are represent its consumers)
> # As messages are dispatched to broker2's instance of the test queue, ACKs 
> are forwarded by the broker1->broker2 bridge and processed by broker1, 
> reducing the memory usage
> # Eventually all messages are consumed by broker2 and broker1's memory usage 
> is 0.
> This test case generally passes since the deadlock requires a specific race 
> condition: namely, the first consumer advisory message needs to be "in 
> flight" when the second consumer advisory message is sent.  Since the network 
> prefetch is 1, when the second advisory message is processed, the topic 
> subscription is "full", and the thread sending the advisory will wait for 
> matched.isFull() to be false.
> In order to increase the chancethe first consumer advisory message is "in 
> flight", simply add a small sleep to TopicSubscription's acknowledge method:
> {code:title=TopicSubscription.java}
> public synchronized void acknowledge(final ConnectionContext context, final 
> MessageAck ack) throws Exception {
>     LOG.info("Acknowledge subscription to " + 
> ack.getDestination().getPhysicalName());
>     Thread.sleep(1000);
> {code}
> The sleep increases the window and pretty much guarantees that the test case 
> will fail (i.e., messages remain in broker1's test queue since the bridge is 
> stalled).
> Even with the sleep in place, if the number of consumers on broker2 is 
> reduced to 1, the test case will pass.  Again, this is because at least two 
> consumer advisory messages are needed to fill the subscription prefetch.
> The use of prefetch=1 for the network connector is simply so that the unit 
> test can demonstrate the deadlock with 2 consumers.  The deadlock can occur 
> with any prefetch if the number of consumers is at least prefetch + 1.
> This is a serious issue for us as our deployment involves many networked 
> brokers and a lot of dynamic consumer creation, which produces frequent 
> consumer advisory messages.  When a network bridge locks up, it cripples our 
> system.
> Solution
> ========
> The essential problem is that sending a consumer advisory message to a topic 
> can take an indefinite amount of time (i.e., waits indefinitely until memory 
> is available), and during this time, no other messages sent to the 
> VMTransport are processed.
> The principle tenant of the TaskRunners used in AMQ is that they implement 
> "cooperative multi-tasking", this means that task execution has to be 
> reasonably quick so that other tasks can be run.  A task that runs 
> indefinitely breaks the utility of the model.
> While it would be possible to use 
> {{TopicSubscription.maximumPendingMessages}} to prevent the 
> {{matchedListMutex}} loop from being entered, this would result in the 
> consumer advisory message being discarded thus the loss of any demand 
> subscriptions that would have resulted --- so it's not an option.
> Unfortunately, without understanding the system further, I can't offer a 
> specific solution.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to