[ https://issues.apache.org/jira/browse/AMQ-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13669136#comment-13669136 ]
Yuriy Sidelnikov edited comment on AMQ-4561 at 5/29/13 10:24 AM: ----------------------------------------------------------------- This issue can be fixed for version 5.7.0 by the patch: --- DemandForwardingBridgeSupport.java +++ DemandForwardingBridgeSupport.java @@ -137,6 +137,7 @@ private TransportConnection duplexInitiatingConnection; private BrokerService brokerService = null; private ObjectName mbeanObjectName; + private long BROKER_INFO_DELIVERING_TIMEOUT = 10; public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { this.configuration = configuration; @@ -487,7 +488,10 @@ } break; case ConsumerInfo.DATA_STRUCTURE_TYPE: -- localStartedLatch.await(); + if ( !localStartedLatch.await(BROKER_INFO_DELIVERING_TIMEOUT,TimeUnit.SECONDS) ) + { + throw new Exception("ConsumerInfo message from a remote broker has been arrived before BrokerInfo message. Transport will be reconnected"); + } if (started.get()) { if (!addConsumerInfo((ConsumerInfo) command)) { if (LOG.isDebugEnabled()) { was (Author: sdcf): This issue can be fixed for version 5.7.0 by the patch: --- DemandForwardingBridgeSupport.java +++ DemandForwardingBridgeSupport.java @@ -137,6 +137,7 @@ private TransportConnection duplexInitiatingConnection; private BrokerService brokerService = null; private ObjectName mbeanObjectName; + private long BROKER_INFO_DELIVERING_TIMEOUT = 10; public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { this.configuration = configuration; @@ -487,7 +488,10 @@ } break; case ConsumerInfo.DATA_STRUCTURE_TYPE: - localStartedLatch.await(); + if ( !localStartedLatch.await(BROKER_INFO_DELIVERING_TIMEOUT,TimeUnit.SECONDS) ) + { + throw new Exception("ConsumerInfo message from a remote broker has been arrived before BrokerInfo message. Transport will be reconnected"); + } if (started.get()) { if (!addConsumerInfo((ConsumerInfo) command)) { if (LOG.isDebugEnabled()) { > Deadlock in DemandForwardingBridgeSupport class if ConsumerInfo message has > arrived before BrokerInfo message. > -------------------------------------------------------------------------------------------------------------- > > Key: AMQ-4561 > URL: https://issues.apache.org/jira/browse/AMQ-4561 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.6.0, 5.7.0 > Reporter: Yuriy Sidelnikov > Labels: broker, deadlock > > To establish a connection a spoke broker sends some special messages to the > hub: > • BrokerInfo > • ConnectionInfo > • SessionInfo > • ProducerInfo > • ConsumerInfo > In return hub gsends exactly the same set of messages: > • BrokerInfo > • ConnectionInfo > • SessionInfo > • ProducerInfo > • ConsumerInfo > > All messages are sent asynchronously so message order is not guaranteed. > > However BrokerInfo message has to be delivered BEFORE ConsumerInfo message to > the spoke broker because: > 1. ConsumerInfo processing logic is depended on some information provided in > BrokerInfo message so spoke broker side has lock on it. > 2. All incoming messages are processed in the same thread including reading > from tcp/nio stream. > Short excerpt from DemandForwardingBridgeSupport class: > private void startLocalBridge() throws Throwable { BrokerInfo message > processing > ….. > localStartedLatch.countDown(); > } > And ConsumerInfo message processing part has the lock: > case ConsumerInfo.DATA_STRUCTURE_TYPE: > localStartedLatch.await(); > So if ConsumerInfo message has been received before BrokerInfo message the > consuming thread will be in dead lock: > Name: ActiveMQ NIO Worker 13 > State: WAITING on java.util.concurrent.CountDownLatch$Sync@150b06b > Total blocked: 1 Total waited: 3 > Stack trace: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:534) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira