[
https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timothy Bish closed AMQ-3127.
-----------------------------
Resolution: Cannot Reproduce
Ran the test repeatedly and could not reproduce using the latest trunk code.
> Network bridge causes deadlock on queue/topic when message dispatch and
> consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
> Key: AMQ-3127
> URL: https://issues.apache.org/jira/browse/AMQ-3127
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.4.2
> Reporter: Stirling Chow
> Priority: Critical
> Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over
> HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent. The
> brokers share a number of topics and queues. Periodically, we have a
> catastrophic (cause still uknown) network outage that only affects the
> outbound bridges from one of the 7 brokers. The affected broker detects the
> outage, stops the existing 6 outbound bridges, and starts 6 new outbound
> bridges. Frequently, we find that the outbound bridges appear to be
> recreated properly, but messages produced by the affected broker to *some* of
> its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of
> the BrokerService threads, which was dispatching a message across an outbound
> bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was
> receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)
> owns: Object (id=104)
> owns: Object (id=105)
> owns: Object (id=106)
> owns: Queue$3 (id=107)
> waiting for: Object (id=108)
> owned by: Daemon Thread [VMTransport] (Running)
> MutexTransport.oneway(Object) line: 40
> ResponseCorrelator.oneway(Object) line: 60
>
> DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command)
> line: 738
> DemandForwardingBridgeSupport$2.onCommand(Object) line: 161
> ResponseCorrelator.onCommand(Object) line: 116
> MutexTransport(TransportFilter).onCommand(Object) line: 69
> VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122
> VMTransport.oneway(Object) line: 113
> MutexTransport.oneway(Object) line: 40
> ResponseCorrelator.oneway(Object) line: 60
> ManagedTransportConnection(TransportConnection).dispatch(Command) line:
> 1249
>
> ManagedTransportConnection(TransportConnection).processDispatch(Command)
> line: 810
> ManagedTransportConnection(TransportConnection).dispatchSync(Command)
> line: 770
> QueueSubscription(PrefetchSubscription).dispatch(MessageReference)
> line: 649
> QueueSubscription(PrefetchSubscription).dispatchPending() line: 599
> QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156
> Queue.doActualDispatch(List<QueueMessageReference>) line: 1798
> Queue.doDispatch(List<QueueMessageReference>) line: 1745
> Queue.pageInMessages(boolean) line: 1898
> Queue.iterate() line: 1425
> PooledTaskRunner.runTask() line: 122
> PooledTaskRunner$1.run() line: 43
> ThreadPoolExecutor$Worker.runTask(Runnable) line: 886
> ThreadPoolExecutor$Worker.run() line: 908
> Thread.run() line: 662
> Daemon Thread [VMTransport] (Suspended)
> owns: Object (id=499)
> owns: RegionBroker$1 (id=205)
> waited by: Daemon Thread [VMTransport] (Running)
> waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler]
> (Running)
> owns: Object (id=108)
> waited by: Daemon Thread [BrokerService[broker1] Task]
> (Suspended)
> owns: URI (id=500)
> Unsafe.park(boolean, long) line: not available [native method]
> LockSupport.park(Object) line: 158
>
> ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt()
> line: 811
>
> ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node,
> int) line: 842
>
> ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int)
> line: 1178
> ReentrantReadWriteLock$WriteLock.lock() line: 807
> Queue.addSubscription(ConnectionContext, Subscription) line: 360
> ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext,
> ConsumerInfo) line: 290
> ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext,
> ConsumerInfo) line: 444
> ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line:
> 240
> AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 89
> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91
> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 89
> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 89
> BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 95
>
> ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
> line: 550
> ConsumerInfo.visit(CommandVisitor) line: 349
>
> Specifically, a message had been produced to one of the shared queues and was
> being dispatched to a remote consumer by the BrokerService thread. In so
> doing, BrokerService had acquired the pagedInPendingDispatchLock lock from
> Queue.java:
> private void doDispatch(List<QueueMessageReference> list) throws
> Exception {
> boolean doWakeUp = false;
> pagedInPendingDispatchLock.writeLock().lock();
>
> BrokerService had sent the message to the remote broker was then
> acknowledging the local transport in DemandForwardingBridgeSupport.java:
> protected void serviceLocalCommand(Command command) {
> ...
> if (!message.isResponseRequired()) {
>
> // If the message was originally sent using async
> // send, we will preserve that QOS
> // by bridging it using an async send (small
> chance
> // of message loss).
> try {
> remoteBroker.oneway(message);
> localBroker.oneway(new MessageAck(md,
> MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then
> acquire the write mutex in MutexTransport.java:
> public void oneway(Object command) throws IOException {
> synchronized (writeMutex) {
> next.oneway(command);
> }
> }
> So the dispatching thread (BrokerService) had acquired
> Queue.pagedInPendingDispatchLock was trying to acquire
> MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same
> outbound bridge through which the aforementioned dispatch was ocurring. The
> bridge's remote transport listener thread (in this example, VMTransport) was
> adding the subscription through DemandForwardingBridgeSupport.java:
> protected void addSubscription(DemandSubscription sub) throws IOException
> {
> if (sub != null) {
> localBroker.oneway(sub.getLocalInfo());
> }
> }
> Again, localBroker is synchronous, so the VMTransport thread acquired
> MutexTransport.writeMutex. Registration of consumers to a queue is
> synchronized with the dispatching of messages, as shown in Queue.java:
> public void addSubscription(ConnectionContext context, Subscription sub)
> throws Exception {
> super.addSubscription(context, sub);
> // synchronize with dispatch method so that no new messages are sent
> // while setting up a subscription. avoid out of order messages,
> // duplicates, etc.
> pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired
> MutexTransport.writeMutex and was trying to acquire
> Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which
> would allow the remote transport listener thread to acquire the
> MutexTransport.writeMutex, but then offload the acquisition of
> Queue.pagedInPendingDispatchLock to its peer listening thread. We've
> included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous. This is
> enforced by BrokerService.java when it starts the network connectors:
> protected void startAllConnectors() throws Exception {
> ....
> URI uri = getVmConnectorURI();
> Map<String, String> map = new HashMap<String,
> String>(URISupport.parseParameters(uri));
> map.put("network", "true");
> map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified :
> /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on
> our systems. Since this is a timing issue, you may need to run the unit test
> several times to create the deadlock. Also note that three specific
> configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there
> can be an existing subscription across the bridge to which messages are being
> dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that
> message subscriptions are are dispatched by the same thread that accesses the
> queue.
> 3) The message producers must be transactionalized; this is so that the
> message dispatches require a response by the dispatch thread (i.e.,
> BrokerService).
> If any of these conditions is not present, deadlock (at least through this
> recreation) does not occur.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Through further testing
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira