[ 
https://issues.apache.org/jira/browse/AMQ-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669136#comment-13669136
 ] 

Yuriy Sidelnikov edited comment on AMQ-4561 at 5/29/13 10:24 AM:
-----------------------------------------------------------------

This issue can be fixed for version 5.7.0 by the patch:
--- DemandForwardingBridgeSupport.java  
+++ DemandForwardingBridgeSupport.java  
@@ -137,6 +137,7 @@
     private TransportConnection duplexInitiatingConnection;
     private BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
+    private long BROKER_INFO_DELIVERING_TIMEOUT = 10;
 
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration 
configuration, Transport localBroker, Transport remoteBroker) {
         this.configuration = configuration;
@@ -487,7 +488,10 @@
                                 }
                                 break;
                             case ConsumerInfo.DATA_STRUCTURE_TYPE:
--                                localStartedLatch.await();
+                                if ( 
!localStartedLatch.await(BROKER_INFO_DELIVERING_TIMEOUT,TimeUnit.SECONDS)  )  
+                                        {
+                                 throw new Exception("ConsumerInfo message 
from a remote broker has been arrived before BrokerInfo message. Transport will 
be reconnected");
+                                        }
                                 if (started.get()) {
                                     if (!addConsumerInfo((ConsumerInfo) 
command)) {
                                         if (LOG.isDebugEnabled()) {

                
      was (Author: sdcf):
    This issue can be fixed for version 5.7.0 by the patch:
--- DemandForwardingBridgeSupport.java  
+++ DemandForwardingBridgeSupport.java  
@@ -137,6 +137,7 @@
     private TransportConnection duplexInitiatingConnection;
     private BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
+    private long BROKER_INFO_DELIVERING_TIMEOUT = 10;
 
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration 
configuration, Transport localBroker, Transport remoteBroker) {
         this.configuration = configuration;
@@ -487,7 +488,10 @@
                                 }
                                 break;
                             case ConsumerInfo.DATA_STRUCTURE_TYPE:
 -                                localStartedLatch.await();
+                                if ( 
!localStartedLatch.await(BROKER_INFO_DELIVERING_TIMEOUT,TimeUnit.SECONDS)  )  
+                                        {
+                                 throw new Exception("ConsumerInfo message 
from a remote broker has been arrived before BrokerInfo message. Transport will 
be reconnected");
+                                        }
                                 if (started.get()) {
                                     if (!addConsumerInfo((ConsumerInfo) 
command)) {
                                         if (LOG.isDebugEnabled()) {

                  
> Deadlock in DemandForwardingBridgeSupport class if ConsumerInfo message has 
> arrived before BrokerInfo message.
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4561
>                 URL: https://issues.apache.org/jira/browse/AMQ-4561
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.6.0, 5.7.0
>            Reporter: Yuriy Sidelnikov
>              Labels: broker, deadlock
>
> To establish a connection a spoke broker sends some special messages to the 
> hub:
> •     BrokerInfo
> •     ConnectionInfo
> •     SessionInfo
> •     ProducerInfo
> •     ConsumerInfo
> In return hub gsends exactly the same set of messages:
> •     BrokerInfo
> •     ConnectionInfo
> •     SessionInfo
> •     ProducerInfo
> •     ConsumerInfo
>  
> All messages are sent asynchronously so message order is not guaranteed. 
>  
> However BrokerInfo message has to be delivered BEFORE ConsumerInfo message to 
> the spoke broker because:
> 1. ConsumerInfo processing logic is depended on some information provided in 
> BrokerInfo message so spoke broker side has lock on it.
> 2.  All incoming messages are processed in the same thread including reading 
> from tcp/nio stream.
> Short excerpt from DemandForwardingBridgeSupport class:
> private void startLocalBridge() throws Throwable {   BrokerInfo message 
> processing
> …..
>     localStartedLatch.countDown();
>             }
> And ConsumerInfo message processing part has the lock:
> case ConsumerInfo.DATA_STRUCTURE_TYPE:
>                                 localStartedLatch.await();
> So if ConsumerInfo message has been received before BrokerInfo message the 
> consuming thread will be in dead lock: 
> Name: ActiveMQ NIO Worker 13
> State: WAITING on java.util.concurrent.CountDownLatch$Sync@150b06b
> Total blocked: 1  Total waited: 3
> Stack trace: 
>  sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:534)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to