[ 
https://issues.apache.org/jira/browse/AMQ-4138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stirling Chow updated AMQ-4138:
-------------------------------

    Description: 
Symptom
=======
We have a network of 4 brokers that share messages using distributed queues via 
demand forwarding bridges.  We were validating the behaviour of the system when 
memory usage approached and exceeded the out-of-box memory limit (64MB).

We discovered that with great frequency the bridges would appear to stop 
functioning --- no messages were being produced or consumed.  We've experienced 
similar behaviour when producer flow control is activated, but in our tests, 
we'd turned producer flow control off (both to avoid bridges stalling due to 
producer flow control and so that we could produce enough messages to exceed 
the memory limit).

The system would never recover from this deadlock. 

Cause
=====
We found a number of threads looping indefinitely with the following stack:

{code}
Daemon Thread [ActiveMQ VMTransport: vm://broker1#7-1] (Suspended)      
        owns: Topic  (id=103)   
        waiting for: Object  (id=104)   
        Object.wait(long) line: not available [native method]   
        TopicSubscription.add(MessageReference) line: 136       
        SimpleDispatchPolicy.dispatch(MessageReference, 
MessageEvaluationContext, List<Subscription>) line: 48  
        Topic.dispatch(ConnectionContext, Message) line: 680    
        Topic.doMessageSend(ProducerBrokerExchange, Message) line: 491  
        Topic.send(ProducerBrokerExchange, Message) line: 427   
        ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, 
Message) line: 407      
        ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message) 
line: 503       
        ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 311     
        AdvisoryBroker.fireAdvisory(ConnectionContext, ActiveMQTopic, Command, 
ConsumerId, ActiveMQMessage) line: 551   
        AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
ActiveMQDestination, ActiveMQTopic, Command, ConsumerId) line: 500       
        AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
ActiveMQDestination, ActiveMQTopic, Command) line: 486   
        AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 98    
        CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, 
ConsumerInfo) line: 89  
        TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, 
ConsumerInfo) line: 89   
        BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext, 
ConsumerInfo) line: 95      
        
ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
 line: 562      
        ConsumerInfo.visit(CommandVisitor) line: 332    
        ManagedTransportConnection(TransportConnection).service(Command) line: 
294      
        TransportConnection$1.onCommand(Object) line: 152       
        ResponseCorrelator.onCommand(Object) line: 116  
        MutexTransport.onCommand(Object) line: 50       
        VMTransport.iterate() line: 241 
        PooledTaskRunner.runTask() line: 129    
        PooledTaskRunner$1.run() line: 47       
        ThreadPoolExecutor$Worker.runTask(Runnable) line: 886   
        ThreadPoolExecutor$Worker.run() line: 908       
        Thread.run() line: 662  
{code}

The spinning threads were assicated with the VMTransport TaskRunner from 
{{DemandForwardingBridgeSupport.localBroker}}.  Since the TaskRunner was 
essentially blocked processing one message, all other messages being forwarded 
from the remote end of the bridge (e.g., ACKs) were getting queued, but not 
processed, which made the bridge appear to be stalled.

The message being processed by the spinning thread was a ConsumerInfo 
representing a demand subscription from the remote broker, and was being 
forwarded to a subscription on the associated consumer advisory topic.  The 
subscription was waiting for memory to become available in the matchListMutex 
loop:

{code:title=TopicSubscription.java}
    public void add(MessageReference node) throws Exception {
        if (isDuplicate(node)) {
            return;
        }
        enqueueCounter.incrementAndGet();
        if (!isFull() && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages 
which
            // have not been dispatched (i.e. we allow the prefetch buffer to 
be filled)
            dispatch(node);
            setSlowConsumer(false);
        } else {
            if (info.getPrefetchSize() > 1 && matched.size() > 
info.getPrefetchSize()) {
                // Slow consumers should log and set their state as such.
                if (!isSlowConsumer()) {
                    LOG.warn(toString() + ": has twice its prefetch limit 
pending, without an ack; it appears to be slow");
                    setSlowConsumer(true);
                    for (Destination dest: destinations) {
                        dest.slowConsumer(getContext(), this);
                    }
                }
            }
            if (maximumPendingMessages != 0) {
                boolean warnedAboutWait = false;
                while (active) {
                    synchronized (matchedListMutex) {
                        while (matched.isFull()) {
                            if (getContext().getStopping().get()) {
                                LOG.warn(toString() + ": stopped waiting for 
space in pendingMessage cursor for: "
                                        + node.getMessageId());
                                enqueueCounter.decrementAndGet();
                                return;
                            }
                            if (!warnedAboutWait) {
                                LOG.info(toString() + ": Pending message cursor 
[" + matched
                                        + "] is full, temp usage ("
                                        + 
+matched.getSystemUsage().getTempUsage().getPercentUsage()
                                        + "%) or memory usage ("
                                        + 
matched.getSystemUsage().getMemoryUsage().getPercentUsage()
                                        + "%) limit reached, blocking message 
add() pending the release of resources.");
                                warnedAboutWait = true;
                            }
                            matchedListMutex.wait(20);
                        }
                        // Temporary storage could be full - so just try to add 
the message
                        // see 
https://issues.apache.org/activemq/browse/AMQ-2475
                        if (matched.tryAddMessageLast(node, 10)) {
                            break;
                        }
                    }
                }
{code}

The {{matched.isFull()}} check that was keeping the thread in the loop is only 
cleared once the memory usage is reduced:

{code:title=AbstractPendingMessageCursor.java}
public boolean isFull() {
    return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
}
{code}

Since the looping thread is essentially stalling the VMTransport on the local 
side of the bridge, no dispatch ACKs can be processed for messages sent from 
the local bridge to the remote broker.  If all consumers are on the remote 
broker and ACKs are not being processed, then memory usage on the local broker 
is never reduced, thus creating a deadlock.

In order for {{TopicSubscription.add(...)}} to enter the infinite loop, the 
following conditions must be met:
{code:title=TopicSubscription.java}
if (!isFull() && matched.isEmpty()  && !isSlave()) {
{code}

{{isFull()}} is true if the number of unacknowledged messages dispatched to the 
subscription is greater than the subscription's prefetch size.  So for the 
deadlock to occur, two things must happen:

# There must be multiple consumers being added to the queue, thus generating 
multiple adds to the consumer advisory topic subscription
# There must be a delay in processing ACKs to the topic subscription so that it 
becomes full

For reasons to do with distribution management, our network connectors have a 
prefetch size of 1, so under load, the deadlock occurs easily.

I've attached a test case that clearly demonstrates the deadlock.  The test 
case is simple:

# Two brokers (broker1 and broker2) are bidirectionally bridged with a network 
prefetch of 1
# broker1 (with producer flow control disabled), produces enough messages to a 
test queue so that the memory limit is exceeded
# broker2 starts two consumers of the test queue, and the broker1->broker2 
bridge forwards two demand subscriptions to broker1
# broker1 processes the demand subscriptions and starts dispatching messages to 
broker2
# Since broker2 has a bridge back to broker1, broker1's processing of the 
demand subscriptions generates two consumer advisory messages that are sent 
over the consumer advisory topic to broker2 (of course, broker2 ignores them 
since they are represent its consumers)
# As messages are dispatched to broker2's instance of the test queue, ACKs are 
forwarded by the broker1->broker2 bridge and processed by broker1, reducing the 
memory usage
# Eventually all messages are consumed by broker2 and broker1's memory usage is 
0.

This test case generally passes since the deadlock requires a specific race 
condition: namely, the first consumer advisory message needs to be "in flight" 
when the second consumer advisory message is sent.  Since the network prefetch 
is 1, when the second advisory message is processed, the topic subscription is 
"full", and the thread sending the advisory will wait for matched.isFull() to 
be false.

In order to increase the change the first consumer advisory message is "in 
flight", simply add a small sleep to TopicSubscription's acknowledge method:

{code:title=TopicSubscription.java}
public synchronized void acknowledge(final ConnectionContext context, final 
MessageAck ack) throws Exception {
    LOG.info("Acknowledge subscription to " + 
ack.getDestination().getPhysicalName());
    Thread.sleep(1000);
{code}

The sleep increases the window and pretty much guarantees that the test case 
will fail (i.e., messages remain in broker1's test queue since the bridge is 
stalled).

The use of prefetch=1 for the network connector is simply so that the unit test 
can demonstrate the deadlock with 2 consumers.  The deadlock can occur with any 
prefetch if the number of consumers is prefetch + 1.



  was:
Symptom
=======
We have a network of 4 brokers that shared messages using distributed queues 
via demand forwarding bridges.  We were validating the behaviour of the system 
when memory usage approached and exceeded the out-of-box memory limit (64MB).

We discovered that with great frequency the bridges would appear to stop 
functioning --- no messages were being produced or consumed.  We've experienced 
similar behaviour when producer flow control is activated, but in our tests, 
we'd turned producer flow control off (both to avoid bridges stalling due to 
producer flow control and so that we could produce enough messages to exceed 
the memory limit).

The system would never recover from this deadlock. 

Cause
=====
We found a number of threads looping indefinitely with the following stack:

{code}
Daemon Thread [ActiveMQ VMTransport: vm://broker1#7-1] (Suspended)      
        owns: Topic  (id=103)   
        waiting for: Object  (id=104)   
        Object.wait(long) line: not available [native method]   
        TopicSubscription.add(MessageReference) line: 136       
        SimpleDispatchPolicy.dispatch(MessageReference, 
MessageEvaluationContext, List<Subscription>) line: 48  
        Topic.dispatch(ConnectionContext, Message) line: 680    
        Topic.doMessageSend(ProducerBrokerExchange, Message) line: 491  
        Topic.send(ProducerBrokerExchange, Message) line: 427   
        ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, 
Message) line: 407      
        ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message) 
line: 503       
        ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 311     
        AdvisoryBroker.fireAdvisory(ConnectionContext, ActiveMQTopic, Command, 
ConsumerId, ActiveMQMessage) line: 551   
        AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
ActiveMQDestination, ActiveMQTopic, Command, ConsumerId) line: 500       
        AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
ActiveMQDestination, ActiveMQTopic, Command) line: 486   
        AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 98    
        CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, 
ConsumerInfo) line: 89  
        TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, 
ConsumerInfo) line: 89   
        BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext, 
ConsumerInfo) line: 95      
        
ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
 line: 562      
        ConsumerInfo.visit(CommandVisitor) line: 332    
        ManagedTransportConnection(TransportConnection).service(Command) line: 
294      
        TransportConnection$1.onCommand(Object) line: 152       
        ResponseCorrelator.onCommand(Object) line: 116  
        MutexTransport.onCommand(Object) line: 50       
        VMTransport.iterate() line: 241 
        PooledTaskRunner.runTask() line: 129    
        PooledTaskRunner$1.run() line: 47       
        ThreadPoolExecutor$Worker.runTask(Runnable) line: 886   
        ThreadPoolExecutor$Worker.run() line: 908       
        Thread.run() line: 662  
{code}

The spinning threads were assicated with the VMTransport TaskRunner from 
{{DemandForwardingBridgeSupport.localBroker}}.  Since the TaskRunner was 
essentially blocked processing one message, all other messages being forwarded 
from the remote end of the bridge (e.g., ACKs) were getting queued, but not 
processed, which made the bridge appear to be stalled.

The message being processed by the spinning thread was a ConsumerInfo 
representing a demand subscription from the remote broker, and was being 
forwarded to a subscription on the associated consumer advisory topic.  The 
subscription was waiting for memory to become available in the matchListMutex 
loop:

{code:title=TopicSubscription.java}
    public void add(MessageReference node) throws Exception {
        if (isDuplicate(node)) {
            return;
        }
        enqueueCounter.incrementAndGet();
        if (!isFull() && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages 
which
            // have not been dispatched (i.e. we allow the prefetch buffer to 
be filled)
            dispatch(node);
            setSlowConsumer(false);
        } else {
            if (info.getPrefetchSize() > 1 && matched.size() > 
info.getPrefetchSize()) {
                // Slow consumers should log and set their state as such.
                if (!isSlowConsumer()) {
                    LOG.warn(toString() + ": has twice its prefetch limit 
pending, without an ack; it appears to be slow");
                    setSlowConsumer(true);
                    for (Destination dest: destinations) {
                        dest.slowConsumer(getContext(), this);
                    }
                }
            }
            if (maximumPendingMessages != 0) {
                boolean warnedAboutWait = false;
                while (active) {
                    synchronized (matchedListMutex) {
                        while (matched.isFull()) {
                            if (getContext().getStopping().get()) {
                                LOG.warn(toString() + ": stopped waiting for 
space in pendingMessage cursor for: "
                                        + node.getMessageId());
                                enqueueCounter.decrementAndGet();
                                return;
                            }
                            if (!warnedAboutWait) {
                                LOG.info(toString() + ": Pending message cursor 
[" + matched
                                        + "] is full, temp usage ("
                                        + 
+matched.getSystemUsage().getTempUsage().getPercentUsage()
                                        + "%) or memory usage ("
                                        + 
matched.getSystemUsage().getMemoryUsage().getPercentUsage()
                                        + "%) limit reached, blocking message 
add() pending the release of resources.");
                                warnedAboutWait = true;
                            }
                            matchedListMutex.wait(20);
                        }
                        // Temporary storage could be full - so just try to add 
the message
                        // see 
https://issues.apache.org/activemq/browse/AMQ-2475
                        if (matched.tryAddMessageLast(node, 10)) {
                            break;
                        }
                    }
                }
{code}

The {{matched.isFull()}} check that was keeping the thread in the loop is only 
cleared once the memory usage is reduced:

{code:title=AbstractPendingMessageCursor.java}
public boolean isFull() {
    return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
}
{code}

Since the looping thread is essentially stalling the VMTransport on the local 
side of the bridge, no dispatch ACKs can be processed for messages sent from 
the local bridge to the remote broker.  If all consumers are on the remote 
broker and ACKs are not being processed, then memory usage on the local broker 
is never reduced, thus creating a deadlock.

In order for {{TopicSubscription.add(...)}} to enter the infinite loop, the 
following conditions must be met:
{code:title=TopicSubscription.java}
if (!isFull() && matched.isEmpty()  && !isSlave()) {
{code}

{{isFull()}} is true if the number of unacknowledged messages dispatched to the 
subscription is greater than the subscription's prefetch size.  So for the 
deadlock to occur, two things must happen:

# There must be multiple consumers being added to the queue, thus generating 
multiple adds to the consumer advisory topic subscription
# There must be a delay in processing ACKs to the topic subscription so that it 
becomes full

For reasons to do with distribution management, our network connectors have a 
prefetch size of 1, so under load, the deadlock occurs easily.

I've attached a test case that clearly demonstrates the deadlock.  The test 
case is simple:

# Two brokers (broker1 and broker2) are bidirectionally bridged with a network 
prefetch of 1
# broker1 (with producer flow control disabled), produces enough messages to a 
test queue so that the memory limit is exceeded
# broker2 starts two consumers of the test queue, and the broker1->broker2 
bridge forwards two demand subscriptions to broker1
# broker1 processes the demand subscriptions and starts dispatching messages to 
broker2
# Since broker2 has a bridge back to broker1, broker1's processing of the 
demand subscriptions generates two consumer advisory messages that are sent 
over the consumer advisory topic to broker2 (of course, broker2 ignores them 
since they are represent its consumers)
# As messages are dispatched to broker2's instance of the test queue, ACKs are 
forwarded by the broker1->broker2 bridge and processed by broker1, reducing the 
memory usage
# Eventually all messages are consumed by broker2 and broker1's memory usage is 
0.

This test case generally passes since the deadlock requires a specific race 
condition: namely, the first consumer advisory message needs to be "in flight" 
when the second consumer advisory message is sent.  Since the network prefetch 
is 1, when the second advisory message is processed, the topic subscription is 
"full", and the thread sending the advisory will wait for matched.isFull() to 
be false.

In order to increase the change the first consumer advisory message is "in 
flight", simply add a small sleep to TopicSubscription's acknowledge method:

{code:title=TopicSubscription.java}
public synchronized void acknowledge(final ConnectionContext context, final 
MessageAck ack) throws Exception {
    LOG.info("Acknowledge subscription to " + 
ack.getDestination().getPhysicalName());
    Thread.sleep(1000);
{code}

The sleep increases the window and pretty much guarantees that the test case 
will fail (i.e., messages remain in broker1's test queue since the bridge is 
stalled).

The use of prefetch=1 for the network connector is simply so that the unit test 
can demonstrate the deadlock with 2 consumers.  The deadlock can occur with any 
prefetch if the number of consumers is prefetch + 1.



    
> Network bridges can deadlock when memory limit exceeded
> -------------------------------------------------------
>
>                 Key: AMQ-4138
>                 URL: https://issues.apache.org/jira/browse/AMQ-4138
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.7.0
>            Reporter: Stirling Chow
>            Priority: Critical
>
> Symptom
> =======
> We have a network of 4 brokers that share messages using distributed queues 
> via demand forwarding bridges.  We were validating the behaviour of the 
> system when memory usage approached and exceeded the out-of-box memory limit 
> (64MB).
> We discovered that with great frequency the bridges would appear to stop 
> functioning --- no messages were being produced or consumed.  We've 
> experienced similar behaviour when producer flow control is activated, but in 
> our tests, we'd turned producer flow control off (both to avoid bridges 
> stalling due to producer flow control and so that we could produce enough 
> messages to exceed the memory limit).
> The system would never recover from this deadlock. 
> Cause
> =====
> We found a number of threads looping indefinitely with the following stack:
> {code}
> Daemon Thread [ActiveMQ VMTransport: vm://broker1#7-1] (Suspended)    
>       owns: Topic  (id=103)   
>       waiting for: Object  (id=104)   
>       Object.wait(long) line: not available [native method]   
>       TopicSubscription.add(MessageReference) line: 136       
>       SimpleDispatchPolicy.dispatch(MessageReference, 
> MessageEvaluationContext, List<Subscription>) line: 48  
>       Topic.dispatch(ConnectionContext, Message) line: 680    
>       Topic.doMessageSend(ProducerBrokerExchange, Message) line: 491  
>       Topic.send(ProducerBrokerExchange, Message) line: 427   
>       ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, 
> Message) line: 407      
>       ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message) 
> line: 503       
>       ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 311     
>       AdvisoryBroker.fireAdvisory(ConnectionContext, ActiveMQTopic, Command, 
> ConsumerId, ActiveMQMessage) line: 551   
>       AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
> ActiveMQDestination, ActiveMQTopic, Command, ConsumerId) line: 500       
>       AdvisoryBroker.fireConsumerAdvisory(ConnectionContext, 
> ActiveMQDestination, ActiveMQTopic, Command) line: 486   
>       AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 98    
>       CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89  
>       TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89   
>       BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 95      
>       
> ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
>  line: 562      
>       ConsumerInfo.visit(CommandVisitor) line: 332    
>       ManagedTransportConnection(TransportConnection).service(Command) line: 
> 294      
>       TransportConnection$1.onCommand(Object) line: 152       
>       ResponseCorrelator.onCommand(Object) line: 116  
>       MutexTransport.onCommand(Object) line: 50       
>       VMTransport.iterate() line: 241 
>       PooledTaskRunner.runTask() line: 129    
>       PooledTaskRunner$1.run() line: 47       
>       ThreadPoolExecutor$Worker.runTask(Runnable) line: 886   
>       ThreadPoolExecutor$Worker.run() line: 908       
>       Thread.run() line: 662  
> {code}
> The spinning threads were assicated with the VMTransport TaskRunner from 
> {{DemandForwardingBridgeSupport.localBroker}}.  Since the TaskRunner was 
> essentially blocked processing one message, all other messages being 
> forwarded from the remote end of the bridge (e.g., ACKs) were getting queued, 
> but not processed, which made the bridge appear to be stalled.
> The message being processed by the spinning thread was a ConsumerInfo 
> representing a demand subscription from the remote broker, and was being 
> forwarded to a subscription on the associated consumer advisory topic.  The 
> subscription was waiting for memory to become available in the matchListMutex 
> loop:
> {code:title=TopicSubscription.java}
>     public void add(MessageReference node) throws Exception {
>         if (isDuplicate(node)) {
>             return;
>         }
>         enqueueCounter.incrementAndGet();
>         if (!isFull() && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages 
> which
>             // have not been dispatched (i.e. we allow the prefetch buffer to 
> be filled)
>             dispatch(node);
>             setSlowConsumer(false);
>         } else {
>             if (info.getPrefetchSize() > 1 && matched.size() > 
> info.getPrefetchSize()) {
>                 // Slow consumers should log and set their state as such.
>                 if (!isSlowConsumer()) {
>                     LOG.warn(toString() + ": has twice its prefetch limit 
> pending, without an ack; it appears to be slow");
>                     setSlowConsumer(true);
>                     for (Destination dest: destinations) {
>                         dest.slowConsumer(getContext(), this);
>                     }
>                 }
>             }
>             if (maximumPendingMessages != 0) {
>                 boolean warnedAboutWait = false;
>                 while (active) {
>                     synchronized (matchedListMutex) {
>                         while (matched.isFull()) {
>                             if (getContext().getStopping().get()) {
>                                 LOG.warn(toString() + ": stopped waiting for 
> space in pendingMessage cursor for: "
>                                         + node.getMessageId());
>                                 enqueueCounter.decrementAndGet();
>                                 return;
>                             }
>                             if (!warnedAboutWait) {
>                                 LOG.info(toString() + ": Pending message 
> cursor [" + matched
>                                         + "] is full, temp usage ("
>                                         + 
> +matched.getSystemUsage().getTempUsage().getPercentUsage()
>                                         + "%) or memory usage ("
>                                         + 
> matched.getSystemUsage().getMemoryUsage().getPercentUsage()
>                                         + "%) limit reached, blocking message 
> add() pending the release of resources.");
>                                 warnedAboutWait = true;
>                             }
>                             matchedListMutex.wait(20);
>                         }
>                         // Temporary storage could be full - so just try to 
> add the message
>                         // see 
> https://issues.apache.org/activemq/browse/AMQ-2475
>                         if (matched.tryAddMessageLast(node, 10)) {
>                             break;
>                         }
>                     }
>                 }
> {code}
> The {{matched.isFull()}} check that was keeping the thread in the loop is 
> only cleared once the memory usage is reduced:
> {code:title=AbstractPendingMessageCursor.java}
> public boolean isFull() {
>     return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : 
> false;
> }
> {code}
> Since the looping thread is essentially stalling the VMTransport on the local 
> side of the bridge, no dispatch ACKs can be processed for messages sent from 
> the local bridge to the remote broker.  If all consumers are on the remote 
> broker and ACKs are not being processed, then memory usage on the local 
> broker is never reduced, thus creating a deadlock.
> In order for {{TopicSubscription.add(...)}} to enter the infinite loop, the 
> following conditions must be met:
> {code:title=TopicSubscription.java}
> if (!isFull() && matched.isEmpty()  && !isSlave()) {
> {code}
> {{isFull()}} is true if the number of unacknowledged messages dispatched to 
> the subscription is greater than the subscription's prefetch size.  So for 
> the deadlock to occur, two things must happen:
> # There must be multiple consumers being added to the queue, thus generating 
> multiple adds to the consumer advisory topic subscription
> # There must be a delay in processing ACKs to the topic subscription so that 
> it becomes full
> For reasons to do with distribution management, our network connectors have a 
> prefetch size of 1, so under load, the deadlock occurs easily.
> I've attached a test case that clearly demonstrates the deadlock.  The test 
> case is simple:
> # Two brokers (broker1 and broker2) are bidirectionally bridged with a 
> network prefetch of 1
> # broker1 (with producer flow control disabled), produces enough messages to 
> a test queue so that the memory limit is exceeded
> # broker2 starts two consumers of the test queue, and the broker1->broker2 
> bridge forwards two demand subscriptions to broker1
> # broker1 processes the demand subscriptions and starts dispatching messages 
> to broker2
> # Since broker2 has a bridge back to broker1, broker1's processing of the 
> demand subscriptions generates two consumer advisory messages that are sent 
> over the consumer advisory topic to broker2 (of course, broker2 ignores them 
> since they are represent its consumers)
> # As messages are dispatched to broker2's instance of the test queue, ACKs 
> are forwarded by the broker1->broker2 bridge and processed by broker1, 
> reducing the memory usage
> # Eventually all messages are consumed by broker2 and broker1's memory usage 
> is 0.
> This test case generally passes since the deadlock requires a specific race 
> condition: namely, the first consumer advisory message needs to be "in 
> flight" when the second consumer advisory message is sent.  Since the network 
> prefetch is 1, when the second advisory message is processed, the topic 
> subscription is "full", and the thread sending the advisory will wait for 
> matched.isFull() to be false.
> In order to increase the change the first consumer advisory message is "in 
> flight", simply add a small sleep to TopicSubscription's acknowledge method:
> {code:title=TopicSubscription.java}
> public synchronized void acknowledge(final ConnectionContext context, final 
> MessageAck ack) throws Exception {
>     LOG.info("Acknowledge subscription to " + 
> ack.getDestination().getPhysicalName());
>     Thread.sleep(1000);
> {code}
> The sleep increases the window and pretty much guarantees that the test case 
> will fail (i.e., messages remain in broker1's test queue since the bridge is 
> stalled).
> The use of prefetch=1 for the network connector is simply so that the unit 
> test can demonstrate the deadlock with 2 consumers.  The deadlock can occur 
> with any prefetch if the number of consumers is prefetch + 1.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to