Randy Prager created AMQ-5190:
---------------------------------
Summary: deadlock between producer and consumer
Key: AMQ-5190
URL: https://issues.apache.org/jira/browse/AMQ-5190
Project: ActiveMQ
Issue Type: Bug
Components: Broker
Affects Versions: 5.5.0
Environment: Linux, Spring, Java 6
Reporter: Randy Prager
We observed a deadlock in the broker, see the 2 below.
# we use spring to configure the broker, producer and consumer
# producer and consumer share the same pooled connection factory
# we have producer flow control enabled
h2. Deadlock
{noformat}
"incommingMessageListenerContainer-1" prio=10 tid=0x00007f5a7d0d8800 nid=0xdae5
waiting for monitor entry [0x00007f5a00cef000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
- waiting to lock <0x00000000c43cf2a8> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
at
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
at
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
at
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
at
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
- locked <0x00000000c5f111e8> (a java.util.LinkedList)
at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:585)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1051)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:948)
at java.lang.Thread.run(Thread.java:662)
{noformat}
{noformat}
"default-selector" prio=10 tid=0x00007f5a7cf35000 nid=0xdae8 waiting on
condition [0x00007f5a009eb000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c4354548> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
- locked <0x00000000c43cf2a8> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1744)
- locked <0x00000000c6343788> (a java.lang.Object)
at
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
- locked <0x00000000c62e6548> (a
org.apache.activemq.ActiveMQMessageProducer)
at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)
at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:569)
at
org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:536)
at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
{noformat}
h2. Producer Flow Control Thread
{noformat}
"ActiveMQ Task-1" daemon prio=10 tid=0x00007f5a7d06f800 nid=0xdae0 in
Object.wait() [0x00007f5a011f4000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.apache.activemq.usage.Usage.waitForSpace(Usage.java:97)
- locked <0x00000000c60c6130> (a java.lang.Object)
at
org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:589)
at
org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:573)
at org.apache.activemq.broker.region.Topic.send(Topic.java:379)
at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:458)
at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
at
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:218)
at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
{noformat}
h2. Spring Bits
{code:xml}
<amq:broker useJmx="false" persistent="false">
<amq:destinationPolicy>
<amq:policyMap>
<amq:policyEntries>
<amq:policyEntry topic=">" producerFlowControl="false"
memoryLimit="20mb">
<amq:dispatchPolicy>
<amq:strictOrderDispatchPolicy />
</amq:dispatchPolicy>
<amq:pendingSubscriberPolicy>
<amq:fileCursor />
</amq:pendingSubscriberPolicy>
</amq:policyEntry>
</amq:policyEntries>
</amq:policyMap>
</amq:destinationPolicy>
<amq:systemUsage>
<amq:systemUsage>
<amq:memoryUsage>
<amq:memoryUsage limit="100 mb"/>
</amq:memoryUsage>
<amq:storeUsage>
<amq:storeUsage limit="100 gb"/>
</amq:storeUsage>
<amq:tempUsage>
<amq:tempUsage limit="100 gb"/>
</amq:tempUsage>
</amq:systemUsage>
</amq:systemUsage>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="vm://localhost" />
<property name="useAsyncSend" value="true" />
<property name="optimizeAcknowledge"
value="false" />
<property
name="warnAboutUnstartedConnectionTimeout" value="500" />
</bean>
</property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory"
ref="pooledConnectionFactory" />
</bean>
<bean id="DBTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="db.topic" />
</bean>
<bean id="DBProducer" class="db.DBProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="DBTopic" />
</bean>
<bean id="DBConsumer" class="db.DBConsumer" />
<!-- <jms:listener-container
connection-factory="pooledConnectionFactory"> -->
<!-- <jms:listener destination="db.topic" ref="DBConsumer"
method="messageReceived"
/> -->
<!-- </jms:listener-container> -->
<bean id="incommingMessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory"
ref="pooledConnectionFactory" />
<property name="destination" ref="DBTopic" />
<property name="messageListener" ref="DBConsumer" />
<!-- Retry connection every 10 seconds. -->
<property name="recoveryInterval" value="10000" />
</bean>
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)