[ 
https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timothy Bish closed AMQ-3127.
-----------------------------

    Resolution: Cannot Reproduce

Ran the test repeatedly and could not reproduce using the latest trunk code.
                
> Network bridge causes deadlock on queue/topic when message dispatch and 
> consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over 
> HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The 
> brokers share a number of topics and queues.  Periodically, we have a 
> catastrophic (cause still uknown) network outage that only affects the 
> outbound bridges from one of the 7 brokers.  The affected broker detects the 
> outage, stops the existing 6 outbound bridges, and starts 6 new outbound 
> bridges.  Frequently, we find that the outbound bridges appear to be 
> recreated properly, but messages produced by the affected broker to *some* of 
> its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of 
> the BrokerService threads, which was dispatching a message across an outbound 
> bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was 
> receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)       
>       owns: Object  (id=104)  
>       owns: Object  (id=105)  
>       owns: Object  (id=106)  
>       owns: Queue$3  (id=107) 
>       waiting for: Object  (id=108)   
>               owned by: Daemon Thread [VMTransport] (Running) 
>       MutexTransport.oneway(Object) line: 40  
>       ResponseCorrelator.oneway(Object) line: 60      
>       
> DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command)
>  line: 738    
>       DemandForwardingBridgeSupport$2.onCommand(Object) line: 161     
>       ResponseCorrelator.onCommand(Object) line: 116  
>       MutexTransport(TransportFilter).onCommand(Object) line: 69      
>       VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122  
>       VMTransport.oneway(Object) line: 113    
>       MutexTransport.oneway(Object) line: 40  
>       ResponseCorrelator.oneway(Object) line: 60      
>       ManagedTransportConnection(TransportConnection).dispatch(Command) line: 
> 1249    
>       
> ManagedTransportConnection(TransportConnection).processDispatch(Command) 
> line: 810      
>       ManagedTransportConnection(TransportConnection).dispatchSync(Command) 
> line: 770 
>       QueueSubscription(PrefetchSubscription).dispatch(MessageReference) 
> line: 649    
>       QueueSubscription(PrefetchSubscription).dispatchPending() line: 599     
>       QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156 
>       Queue.doActualDispatch(List<QueueMessageReference>) line: 1798  
>       Queue.doDispatch(List<QueueMessageReference>) line: 1745        
>       Queue.pageInMessages(boolean) line: 1898        
>       Queue.iterate() line: 1425      
>       PooledTaskRunner.runTask() line: 122    
>       PooledTaskRunner$1.run() line: 43       
>       ThreadPoolExecutor$Worker.runTask(Runnable) line: 886   
>       ThreadPoolExecutor$Worker.run() line: 908       
>       Thread.run() line: 662  
> Daemon Thread [VMTransport] (Suspended)       
>       owns: Object  (id=499)  
>       owns: RegionBroker$1  (id=205)  
>               waited by: Daemon Thread [VMTransport] (Running)        
>               waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] 
> (Running) 
>       owns: Object  (id=108)  
>               waited by: Daemon Thread [BrokerService[broker1] Task] 
> (Suspended)      
>       owns: URI  (id=500)     
>       Unsafe.park(boolean, long) line: not available [native method]  
>       LockSupport.park(Object) line: 158      
>       
> ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt()
>  line: 811        
>       
> ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node,
>  int) line: 842    
>       
> ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) 
> line: 1178  
>       ReentrantReadWriteLock$WriteLock.lock() line: 807       
>       Queue.addSubscription(ConnectionContext, Subscription) line: 360        
>       ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 290       
>       ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 444        
>       ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 
> 240      
>       AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89      
>       AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91    
>       CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89  
>       TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 89   
>       BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, 
> ConsumerInfo) line: 95      
>       
> ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
>  line: 550      
>       ConsumerInfo.visit(CommandVisitor) line: 349    
>       
> Specifically, a message had been produced to one of the shared queues and was 
> being dispatched to a remote consumer by the BrokerService thread.  In so 
> doing, BrokerService had acquired the pagedInPendingDispatchLock lock from 
> Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws 
> Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
>       
> BrokerService had sent the message to the remote broker was then 
> acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small 
> chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, 
> MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then 
> acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired 
> Queue.pagedInPendingDispatchLock was trying to acquire 
> MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same 
> outbound bridge through which the aforementioned dispatch was ocurring.  The 
> bridge's remote transport listener thread (in this example, VMTransport) was 
> adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException 
> {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired 
> MutexTransport.writeMutex.  Registration of consumers to a queue is 
> synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) 
> throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired 
> MutexTransport.writeMutex and was trying to acquire 
> Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which 
> would allow the remote transport listener thread to acquire the 
> MutexTransport.writeMutex, but then offload the acquisition of 
> Queue.pagedInPendingDispatchLock to its peer listening thread.  We've 
> included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is 
> enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, 
> String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : 
> /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on 
> our systems.  Since this is a timing issue, you may need to run the unit test 
> several times to create the deadlock.  Also note that three specific 
> configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there 
> can be an existing subscription across the bridge to which messages are being 
> dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that 
> message subscriptions are are dispatched by the same thread that accesses the 
> queue.
> 3) The message producers must be transactionalized; this is so that the 
> message dispatches require a response by the dispatch thread (i.e., 
> BrokerService).
> If any of these conditions is not present, deadlock (at least through this 
> recreation) does not occur.        
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
>       
> Through further testing 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to