[ 
https://issues.apache.org/jira/browse/AMQ-5409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wawan updated AMQ-5409:
-----------------------
    Description: 
When prefetch is set to 0, the shutdown of a Spring Default Message Listener 
never ends. We think there is a deadlock.

This does not occur when prefetch is 1 or when we set a TransactionManager in 
the Spring Default Message Listener.

DeadLock :

"jmsContainer-4@2020" prio=5 tid=0x14 nid=NA waiting
  java.lang.Thread.State: WAITING
          at java.lang.Object.wait(Object.java:-1)
          at java.lang.Object.wait(Object.java:485)
          at 
org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:90)
          at 
org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:478)
          at 
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:629)
          at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
          at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
          at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
          at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
          at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
          at java.lang.Thread.run(Thread.java:662)


"Thread-0@594" prio=5 tid=0xb nid=NA waiting
  java.lang.Thread.State: WAITING
          at java.lang.Object.wait(Object.java:-1)
          at java.lang.Object.wait(Object.java:485)
          at 
org.springframework.jms.listener.DefaultMessageListenerContainer.doShutdown(DefaultMessageListenerContainer.java:543)
          at 
org.springframework.jms.listener.AbstractJmsListeningContainer.shutdown(AbstractJmsListeningContainer.java:237)
          at 
com.testprefetch.PrefetchTest.startAndStopContainer(PrefetchTest.java:43)
          at 
com.testprefetch.PrefetchTest.testStopDMLCPrefetch0(PrefetchTest.java:15)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          at java.lang.reflect.Method.invoke(Method.java:597)
          at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
          at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
          at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at 
org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)

This could be related with method 
org.apache.activemq.ActiveMQMessageConsumer#receive(long) :

            if (info.getPrefetchSize() == 0) {
                md = dequeue(-1); // We let the broker let us know when we 
timeout.
            } else {
                md = dequeue(timeout);
            }


Issue present in versions : 
        ActiveMQ 5.9.1, 5.10.0
        Springframework 3.2.8.RELEASE, 4.0.5.RELEASE


  was:
When prefetch is set to 0, the shutdown of a Spring Default Message Listener 
never ends. We think there is a deadlock.

This does not occur when prefetch is 1 or when we set a TransactionManager in 
the Spring Default Message Listener.

DeadLock :

"jmsContainer-4@2020" prio=5 tid=0x14 nid=NA waiting
  java.lang.Thread.State: WAITING
          at java.lang.Object.wait(Object.java:-1)
          at java.lang.Object.wait(Object.java:485)
          at 
org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:90)
          at 
org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:478)
          at 
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:629)
          at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
          at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
          at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
          at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
          at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
          at java.lang.Thread.run(Thread.java:662)


"Thread-0@594" prio=5 tid=0xb nid=NA waiting
  java.lang.Thread.State: WAITING
          at java.lang.Object.wait(Object.java:-1)
          at java.lang.Object.wait(Object.java:485)
          at 
org.springframework.jms.listener.DefaultMessageListenerContainer.doShutdown(DefaultMessageListenerContainer.java:543)
          at 
org.springframework.jms.listener.AbstractJmsListeningContainer.shutdown(AbstractJmsListeningContainer.java:237)
          at 
com.testprefetch.PrefetchTest.startAndStopContainer(PrefetchTest.java:43)
          at 
com.testprefetch.PrefetchTest.testStopDMLCPrefetch0(PrefetchTest.java:15)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          at java.lang.reflect.Method.invoke(Method.java:597)
          at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
          at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
          at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at 
org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)

This could be related with method 
org.apache.activemq.ActiveMQMessageConsumer#receive(long) :

            if (info.getPrefetchSize() == 0) {
                md = dequeue(-1); // We let the broker let us know when we 
timeout.
            } else {
                md = dequeue(timeout);
            }


Issue present in versions : 
        ActiveMQ 5.9.1, 5.10.0
        Springframework 3.2.8.RELEASE, 4.0.5.RELEASE



Test Case to reproduce problem:

-----------------------------------------------------------
FILE : com/testprefetch/PrefetchTest.java

package com.testprefetch;

import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

public class PrefetchTest {


    @Test(timeout = 60 * 1000)
    public void testStopDMLCPrefetch0() throws InterruptedException {
        
startAndStopContainer("/com/testprefetch/testPrefetch_0_Deadlock.appctx.xml");
    }

    @Test(timeout = 60 * 1000)
    public void testStopDMLCPrefetch1() throws InterruptedException {
        startAndStopContainer("/com/testprefetch/testPrefetch_1.appctx.xml");
    }

    @Test(timeout = 60 * 1000)
    public void testStopDMLCPrefetch0WithTM() throws InterruptedException {
        
startAndStopContainer("/com/testprefetch/testPrefetch_0_WithTransactionManager.appctx.xml");
    }

    public void startAndStopContainer(String applicationContext) throws 
InterruptedException {
        ClassPathXmlApplicationContext appctx = null;
        try {
            appctx = new ClassPathXmlApplicationContext(applicationContext);
            DefaultMessageListenerContainer container = 
(DefaultMessageListenerContainer) appctx.getBean("jmsContainer");
            System.out.println("Starting container...");
            container.start();
            Thread.sleep(2000);
            if (container.isActive() && container.isRunning()) {
                System.out.println("Container started.");
            } else {
                fail("Unable to start container.");
            }

            System.out.println("Stopping container...");
            container.shutdown();
            Thread.sleep(2000);

            // verify if container has been shut down
            assertFalse("container should be down", container.isRunning());
            assertFalse("container should be down", container.isActive());
        } finally {
            if (appctx != null) {
                appctx.close();
            }
        }
    }

}


-----------------------------------------------------------
FILE : com/testprefetch/SimpleMessageListener.java

package com.testprefetch;

import javax.jms.Message;
import javax.jms.MessageListener;


public class SimpleMessageListener implements MessageListener{

    public void onMessage(Message message) {
        System.out.println("message received !");
    }
}



-----------------------------------------------------------
FILE : com/testprefetch/testPrefetch_0_Deadlock.appctx.xml

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:amq="http://activemq.apache.org/schema/core"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
  http://activemq.apache.org/schema/core/activemq-core.xsd";>


    <amq:connectionFactory id="jmsConnectionFactory" 
brokerURL="tcp://localhost:61166" >
        <amq:prefetchPolicy>
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="queuePrefetch"><value>0</value></property>
            </bean>
        </amq:prefetchPolicy>
    </amq:connectionFactory>

    <bean id="jmsContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="autoStartup" value="false"/>
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
        <property name="destinationName" value="TEST"/>
        <property name="messageListener" ref="messageListener" />
        <property name="concurrentConsumers" value="4" />
        <property name="maxConcurrentConsumers" value="10" />
        <property name="maxMessagesPerTask" value="100" />
    </bean>

    <bean id="messageListener" class="com.testprefetch.SimpleMessageListener">
    </bean>


    <!--  broker -->
    <amq:broker id="broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://0.0.0.0:61166" />
        </amq:transportConnectors>
    </amq:broker>

</beans>

-----------------------------------------------------------
FILE : com/testprefetch/testPrefetch_0_WithTransactionManager.appctx.xml

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:amq="http://activemq.apache.org/schema/core"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
  http://activemq.apache.org/schema/core/activemq-core.xsd";>


    <amq:connectionFactory id="jmsConnectionFactory" 
brokerURL="tcp://localhost:61166" >
        <amq:prefetchPolicy>
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="queuePrefetch"><value>0</value></property>
            </bean>
        </amq:prefetchPolicy>
    </amq:connectionFactory>

    <bean id="jmsContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="autoStartup" value="false"/>
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
        <property name="destinationName" value="TEST"/>
        <property name="messageListener" ref="messageListener" />
        <property name="concurrentConsumers" value="4" />
        <property name="maxConcurrentConsumers" value="10" />
        <property name="maxMessagesPerTask" value="100" />
        <property name="transactionManager" ref="transactionManager"/>
    </bean>

    <bean id="transactionManager" 
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
    </bean>


    <bean id="messageListener" class="com.testprefetch.SimpleMessageListener">
    </bean>


    <!--  broker -->
    <amq:broker id="broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://0.0.0.0:61166" />
        </amq:transportConnectors>
    </amq:broker>

</beans>

-----------------------------------------------------------
FILE : com/testprefetch/testPrefetch_1.appctx.xml

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:amq="http://activemq.apache.org/schema/core"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
  http://activemq.apache.org/schema/core/activemq-core.xsd";>


    <amq:connectionFactory id="jmsConnectionFactory" 
brokerURL="tcp://localhost:61166" >
        <amq:prefetchPolicy>
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="queuePrefetch"><value>1</value></property>
            </bean>
        </amq:prefetchPolicy>
    </amq:connectionFactory>

    <bean id="jmsContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="autoStartup" value="false"/>
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
        <property name="destinationName" value="TEST"/>
        <property name="messageListener" ref="messageListener" />
        <property name="concurrentConsumers" value="4" />
        <property name="maxConcurrentConsumers" value="10" />
        <property name="maxMessagesPerTask" value="100" />
    </bean>

    <bean id="messageListener" class="com.testprefetch.SimpleMessageListener">
    </bean>


    <!--  broker -->
    <amq:broker id="broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://0.0.0.0:61166" />
        </amq:transportConnectors>
    </amq:broker>

</beans>




> With prefetch 0, Spring Default Message Listener Shutdown never ends
> --------------------------------------------------------------------
>
>                 Key: AMQ-5409
>                 URL: https://issues.apache.org/jira/browse/AMQ-5409
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.9.1, 5.10.0
>         Environment: Ubuntu 13.10 64Bits
>            Reporter: Wawan
>         Attachments: PrefetchTest.java, SimpleMessageListener.java, 
> testPrefetch_0_Deadlock.appctx.xml, 
> testPrefetch_0_WithTransactionManager.appctx.xml, testPrefetch_1.appctx.xml
>
>
> When prefetch is set to 0, the shutdown of a Spring Default Message Listener 
> never ends. We think there is a deadlock.
> This does not occur when prefetch is 1 or when we set a TransactionManager in 
> the Spring Default Message Listener.
> DeadLock :
> "jmsContainer-4@2020" prio=5 tid=0x14 nid=NA waiting
>   java.lang.Thread.State: WAITING
>         at java.lang.Object.wait(Object.java:-1)
>         at java.lang.Object.wait(Object.java:485)
>         at 
> org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:90)
>         at 
> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:478)
>         at 
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:629)
>         at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
>         at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
>         at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
>         at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
>         at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
>         at java.lang.Thread.run(Thread.java:662)
> "Thread-0@594" prio=5 tid=0xb nid=NA waiting
>   java.lang.Thread.State: WAITING
>         at java.lang.Object.wait(Object.java:-1)
>         at java.lang.Object.wait(Object.java:485)
>         at 
> org.springframework.jms.listener.DefaultMessageListenerContainer.doShutdown(DefaultMessageListenerContainer.java:543)
>         at 
> org.springframework.jms.listener.AbstractJmsListeningContainer.shutdown(AbstractJmsListeningContainer.java:237)
>         at 
> com.testprefetch.PrefetchTest.startAndStopContainer(PrefetchTest.java:43)
>         at 
> com.testprefetch.PrefetchTest.testStopDMLCPrefetch0(PrefetchTest.java:15)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at 
> org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
> This could be related with method 
> org.apache.activemq.ActiveMQMessageConsumer#receive(long) :
>             if (info.getPrefetchSize() == 0) {
>                 md = dequeue(-1); // We let the broker let us know when we 
> timeout.
>             } else {
>                 md = dequeue(timeout);
>             }
> Issue present in versions : 
>       ActiveMQ 5.9.1, 5.10.0
>       Springframework 3.2.8.RELEASE, 4.0.5.RELEASE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to