Randy Prager created AMQ-5190:
---------------------------------

             Summary: deadlock between producer and consumer
                 Key: AMQ-5190
                 URL: https://issues.apache.org/jira/browse/AMQ-5190
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.5.0
         Environment: Linux, Spring, Java 6
            Reporter: Randy Prager


We observed a deadlock in the broker, see the 2 below.

# we use spring to configure the broker, producer and consumer
# producer and consumer share the same pooled connection factory
# we have producer flow control enabled

h2. Deadlock

{noformat}
"incommingMessageListenerContainer-1" prio=10 tid=0x00007f5a7d0d8800 nid=0xdae5 
waiting for monitor entry [0x00007f5a00cef000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
        - waiting to lock <0x00000000c43cf2a8> (a java.lang.Object)
        at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at 
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
        at 
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
        at 
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
        at 
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
        at 
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
        at 
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
        - locked <0x00000000c5f111e8> (a java.util.LinkedList)
        at 
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:585)
        at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
        at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
        at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
        at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
        at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1051)
        at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:948)
        at java.lang.Thread.run(Thread.java:662)
{noformat}


{noformat}
"default-selector" prio=10 tid=0x00007f5a7cf35000 nid=0xdae8 waiting on 
condition [0x00007f5a009eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000c4354548> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at 
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:94)
        at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        - locked <0x00000000c43cf2a8> (a java.lang.Object)
        at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at 
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
        at 
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
        at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1744)
        - locked <0x00000000c6343788> (a java.lang.Object)
        at 
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
        at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
        - locked <0x00000000c62e6548> (a 
org.apache.activemq.ActiveMQMessageProducer)
        at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:569)
        at 
org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:536)
        at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
        at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534)
{noformat}

h2. Producer Flow Control Thread

{noformat}
"ActiveMQ Task-1" daemon prio=10 tid=0x00007f5a7d06f800 nid=0xdae0 in 
Object.wait() [0x00007f5a011f4000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at org.apache.activemq.usage.Usage.waitForSpace(Usage.java:97)
        - locked <0x00000000c60c6130> (a java.lang.Object)
        at 
org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:589)
        at 
org.apache.activemq.broker.region.BaseDestination.waitForSpace(BaseDestination.java:573)
        at org.apache.activemq.broker.region.Topic.send(Topic.java:379)
        at 
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
        at 
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:523)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at 
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at 
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at 
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at 
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:458)
        at 
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
        at 
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:306)
        at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
        at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at 
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:218)
        at 
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
        at 
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
        at java.lang.Thread.run(Thread.java:662)
{noformat}

h2. Spring Bits

{code:xml}
<amq:broker useJmx="false" persistent="false">  
                
                <amq:destinationPolicy> 
            <amq:policyMap> 
              <amq:policyEntries> 
                <amq:policyEntry topic=">" producerFlowControl="false" 
memoryLimit="20mb">
                        <amq:dispatchPolicy>
                                <amq:strictOrderDispatchPolicy />
                        </amq:dispatchPolicy>
                        <amq:pendingSubscriberPolicy>
                                <amq:fileCursor />
                        </amq:pendingSubscriberPolicy>
                </amq:policyEntry> 
              </amq:policyEntries> 
            </amq:policyMap> 
        </amq:destinationPolicy> 

         <amq:systemUsage> 
            <amq:systemUsage> 
                <amq:memoryUsage> 
                    <amq:memoryUsage limit="100 mb"/> 
                </amq:memoryUsage> 
                <amq:storeUsage> 
                    <amq:storeUsage limit="100 gb"/> 
                </amq:storeUsage> 
                <amq:tempUsage> 
                    <amq:tempUsage limit="100 gb"/> 
                </amq:tempUsage> 
            </amq:systemUsage> 
        </amq:systemUsage> 
        
        <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://localhost:0" />
                </amq:transportConnectors>
                
        </amq:broker>

        <bean id="pooledConnectionFactory" 
class="org.apache.activemq.pool.PooledConnectionFactory">
                <property name="connectionFactory">
                        <bean 
class="org.apache.activemq.ActiveMQConnectionFactory">
                                <property name="brokerURL" 
value="vm://localhost" />
                                <property name="useAsyncSend" value="true" />
                                <property name="optimizeAcknowledge" 
value="false" />
                                <property 
name="warnAboutUnstartedConnectionTimeout" value="500" />
                        </bean>
                </property>
        </bean>

        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory" 
ref="pooledConnectionFactory" />
        </bean>

        <bean id="DBTopic" class="org.apache.activemq.command.ActiveMQTopic">
                <constructor-arg value="db.topic" />
        </bean>

        <bean id="DBProducer" class="db.DBProducer">
                <property name="template" ref="jmsTemplate" />
                <property name="destination" ref="DBTopic" />
        </bean>

        <bean id="DBConsumer" class="db.DBConsumer" />


        <!-- <jms:listener-container 
connection-factory="pooledConnectionFactory"> -->
        <!-- <jms:listener destination="db.topic" ref="DBConsumer" 
method="messageReceived" 
                /> -->
        <!-- </jms:listener-container> -->

        <bean id="incommingMessageListenerContainer"
                
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="connectionFactory" 
ref="pooledConnectionFactory" />
                <property name="destination" ref="DBTopic" />
                <property name="messageListener" ref="DBConsumer" />
                <!-- Retry connection every 10 seconds. -->
                <property name="recoveryInterval" value="10000" />
        </bean>
{code}




--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to