[ https://issues.apache.org/jira/browse/AMQ-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544650#comment-16544650 ]
Jamie goodyear commented on AMQ-7011: ------------------------------------- I'm made a PR for this card: [https://github.com/apache/activemq/pulls] https://github.com/apache/activemq/pull/287 > Activemq 5.15.4 Stomp protocol allowed to enter deadlock via dispatch sync > -------------------------------------------------------------------------- > > Key: AMQ-7011 > URL: https://issues.apache.org/jira/browse/AMQ-7011 > Project: ActiveMQ > Issue Type: Bug > Reporter: Jamie goodyear > Priority: Major > > Activemq 5.15.4 Stomp protocol allowed to enter deadlock via dispatch sync. > Scenario: > Stomp client setting the following: > header.put("id", subId); > header.put("activemq.dispatchAsync", "false"); > The setup of locks between TopicSubscription and MutexTransport while using > Stomp in sync mode can result in a Deadlock as found below (Add and Destroy > calls processing), each lock is identified by a + or * to show lock order in > each stack trace. > Found one Java-level deadlock: > ============================= > "ActiveMQ Transport: tcp:///127.0.0.1:58303@61613": > waiting to lock monitor 0x00007f9c565d4d28 (object 0x00000007acc44708, a > java.lang.Object), > which is held by "ActiveMQ Transport: tcp:///127.0.0.1:58302@61613" > "ActiveMQ Transport: tcp:///127.0.0.1:58302@61613": > waiting for ownable synchronizer 0x00000007ac872730, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by "ActiveMQ Transport: tcp:///127.0.0.1:58303@61613" > Java stack information for the threads listed above: > =================================================== > "ActiveMQ Transport: tcp:///127.0.0.1:58303@61613": > ++++++ at > org.apache.activemq.broker.region.TopicSubscription.destroy(TopicSubscription.java:757) > - waiting to lock <0x00000007acc44708> (a java.lang.Object) > at > org.apache.activemq.broker.region.AbstractRegion.destroySubscription(AbstractRegion.java:488) > at > org.apache.activemq.broker.jmx.ManagedTopicRegion.destroySubscription(ManagedTopicRegion.java:52) > at > org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:480) > at > org.apache.activemq.broker.region.TopicRegion.removeConsumer(TopicRegion.java:206) > at > org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:429) > at > org.apache.activemq.broker.jmx.ManagedRegionBroker.removeConsumer(ManagedRegionBroker.java:258) > at > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139) > at > org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:352) > at > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139) > at > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139) > at > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:139) > at > org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:729) > at > org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:768) > at > org.apache.activemq.broker.TransportConnection.processRemoveConnection(TransportConnection.java:879) > - locked <0x00000007ac999f00> (a > org.apache.activemq.broker.jmx.ManagedTransportConnection) > at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:73) > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194) > ***** at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) > at > org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301) > at > org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97) > at > org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202) > at > org.apache.activemq.transport.stomp.ProtocolConverter.onStompDisconnect(ProtocolConverter.java:838) > at > org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:267) > at > org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) > at java.lang.Thread.run(Thread.java:748) > "ActiveMQ Transport: tcp:///127.0.0.1:58302@61613": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007ac872730> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > ***** at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66) > at > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1486) > at > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971) > at > org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:927) > at > org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:715) > +++++ at > org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:121) > - locked <0x00000007acc44708> (a java.lang.Object) > at > org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48) > at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:775) > at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:556) > - locked <0x00000007acbd4e08> (a org.apache.activemq.broker.region.Topic) > at org.apache.activemq.broker.region.Topic.send(Topic.java:484) > at > org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:505) > at > org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:459) > at > org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293) > at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154) > at > org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96) > at > org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:293) > at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154) > at > org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:572) > at > org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768) > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194) > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) > at > org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301) > at > org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97) > at > org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202) > at > org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:345) > at > org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:248) > at > org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) > at java.lang.Thread.run(Thread.java:748) > In reviewing how MQTT and AMQP were handled, both set > "consumerInfo.setDispatchAsync(true);" > AMQPSession line 376 > MQTTDefualtSubscriptionStrategy line 61 > For MQTT fix see the below cards: > ENTMQ-1504 > https://issues.jboss.org/browse/ENTMQ-1504?_sscc=t > AMQ-5290 > https://issues.apache.org/jira/browse/AMQ-5290 > https://github.com/apache/activemq/blame/master/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java#L61 > > I believe we can fix this for Stomp in ProtocolConverter, line 614, by also > setting consumerInfo.setDispatchAsync(true); This would bring Stomp into > alignment with the other protocols and how they were fixed. > I'll be providing a patch with the above change shortly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)