[ 
https://issues.apache.org/jira/browse/AMQ-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544650#comment-16544650
 ] 

Jamie goodyear commented on AMQ-7011:
-------------------------------------

I'm made a PR for this card:
[https://github.com/apache/activemq/pulls]
https://github.com/apache/activemq/pull/287

> Activemq 5.15.4 Stomp protocol allowed to enter deadlock via dispatch sync
> --------------------------------------------------------------------------
>
>                 Key: AMQ-7011
>                 URL: https://issues.apache.org/jira/browse/AMQ-7011
>             Project: ActiveMQ
>          Issue Type: Bug
>            Reporter: Jamie goodyear
>            Priority: Major
>
> Activemq 5.15.4 Stomp protocol allowed to enter deadlock via dispatch sync.
> Scenario:
> Stomp client setting the following:
> header.put("id", subId);
> header.put("activemq.dispatchAsync", "false");
> The setup of locks between TopicSubscription and MutexTransport while using 
> Stomp in sync mode can result in a Deadlock as found below (Add and Destroy 
> calls processing), each lock is identified by a + or * to show lock order in 
> each stack trace.
> 

Found one Java-level deadlock:
> =============================
> "ActiveMQ Transport: tcp:///127.0.0.1:58303@61613":
>   waiting to lock monitor 0x00007f9c565d4d28 (object 0x00000007acc44708, a 
> java.lang.Object),
>   which is held by "ActiveMQ Transport: tcp:///127.0.0.1:58302@61613"
> "ActiveMQ Transport: tcp:///127.0.0.1:58302@61613":
>   waiting for ownable synchronizer 0x00000007ac872730, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "ActiveMQ Transport: tcp:///127.0.0.1:58303@61613"
> Java stack information for the threads listed above:
> ===================================================
> "ActiveMQ Transport: tcp:///127.0.0.1:58303@61613":
> ++++++  at 
> org.apache.activemq.broker.region.TopicSubscription.destroy(TopicSubscription.java:757)
>     - waiting to lock <0x00000007acc44708> (a java.lang.Object)
>     at 
> org.apache.activemq.broker.region.AbstractRegion.destroySubscription(AbstractRegion.java:488)
>     at 
> org.apache.activemq.broker.jmx.ManagedTopicRegion.destroySubscription(ManagedTopicRegion.java:52)
>     at 
> org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:480)
>     at 
> org.apache.activemq.broker.region.TopicRegion.removeConsumer(TopicRegion.java:206)
>     at 
> org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:429)
>     at 
> org.apache.activemq.broker.jmx.ManagedRegionBroker.removeConsumer(ManagedRegionBroker.java:258)
>     at 
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139)
>     at 
> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:352)
>     at 
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139)
>     at 
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139)
>     at 
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139)
>     at 
> org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:729)
>     at 
> org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:768)
>     at 
> org.apache.activemq.broker.TransportConnection.processRemoveConnection(TransportConnection.java:879)
>     - locked <0x00000007ac999f00> (a 
> org.apache.activemq.broker.jmx.ManagedTransportConnection)
>     at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:73)
>     at 
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330)
>     at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194)
> *****   at 
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
>     at 
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
>     at 
> org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
>     at 
> org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
>     at 
> org.apache.activemq.transport.stomp.ProtocolConverter.onStompDisconnect(ProtocolConverter.java:838)
>     at 
> org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:267)
>     at 
> org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
>     at 
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>     at 
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
>     at 
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
>     at java.lang.Thread.run(Thread.java:748)
> "ActiveMQ Transport: tcp:///127.0.0.1:58302@61613":
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007ac872730> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>     at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>     at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> *****   at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
>     at 
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1486)
>     at 
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971)
>     at 
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:927)
>     at 
> org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:715)
> +++++   at 
> org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:121)
>     - locked <0x00000007acc44708> (a java.lang.Object)
>     at 
> org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
>     at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:775)
>     at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:556)
>     - locked <0x00000007acbd4e08> (a org.apache.activemq.broker.region.Topic)
>     at org.apache.activemq.broker.region.Topic.send(Topic.java:484)
>     at 
> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:505)
>     at 
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:459)
>     at 
> org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)
>     at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
>     at 
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
>     at 
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:293)
>     at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
>     at 
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:572)
>     at 
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)
>     at 
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330)
>     at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194)
>     at 
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
>     at 
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
>     at 
> org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
>     at 
> org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
>     at 
> org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:345)
>     at 
> org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:248)
>     at 
> org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
>     at 
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>     at 
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
>     at 
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
>     at java.lang.Thread.run(Thread.java:748)
> In reviewing how MQTT and AMQP were handled, both set 
> "consumerInfo.setDispatchAsync(true);"
> AMQPSession line 376
> MQTTDefualtSubscriptionStrategy line 61
> For MQTT fix see the below cards:
> ENTMQ-1504
> https://issues.jboss.org/browse/ENTMQ-1504?_sscc=t
> AMQ-5290
> https://issues.apache.org/jira/browse/AMQ-5290

> https://github.com/apache/activemq/blame/master/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java#L61
> 

> I believe we can fix this for Stomp in ProtocolConverter, line 614, by also 
> setting consumerInfo.setDispatchAsync(true); This would bring Stomp into 
> alignment with the other protocols and how they were fixed.
> I'll be providing a patch with the above change shortly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to