[
https://issues.apache.org/jira/browse/AMQ-5409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wawan updated AMQ-5409:
-----------------------
Description:
When prefetch is set to 0, the shutdown of a Spring Default Message Listener
never ends. We think there is a deadlock.
This does not occur when prefetch is 1 or when we set a TransactionManager in
the Spring Default Message Listener.
DeadLock :
"jmsContainer-4@2020" prio=5 tid=0x14 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:485)
at
org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:90)
at
org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:478)
at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:629)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
at java.lang.Thread.run(Thread.java:662)
"Thread-0@594" prio=5 tid=0xb nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:485)
at
org.springframework.jms.listener.DefaultMessageListenerContainer.doShutdown(DefaultMessageListenerContainer.java:543)
at
org.springframework.jms.listener.AbstractJmsListeningContainer.shutdown(AbstractJmsListeningContainer.java:237)
at
com.testprefetch.PrefetchTest.startAndStopContainer(PrefetchTest.java:43)
at
com.testprefetch.PrefetchTest.testStopDMLCPrefetch0(PrefetchTest.java:15)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
This could be related with method
org.apache.activemq.ActiveMQMessageConsumer#receive(long) :
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we
timeout.
} else {
md = dequeue(timeout);
}
Issue present in versions :
ActiveMQ 5.9.1, 5.10.0
Springframework 3.2.8.RELEASE, 4.0.5.RELEASE
was:
When prefetch is set to 0, the shutdown of a Spring Default Message Listener
never ends. We think there is a deadlock.
This does not occur when prefetch is 1 or when we set a TransactionManager in
the Spring Default Message Listener.
DeadLock :
"jmsContainer-4@2020" prio=5 tid=0x14 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:485)
at
org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:90)
at
org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:478)
at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:629)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
at java.lang.Thread.run(Thread.java:662)
"Thread-0@594" prio=5 tid=0xb nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:485)
at
org.springframework.jms.listener.DefaultMessageListenerContainer.doShutdown(DefaultMessageListenerContainer.java:543)
at
org.springframework.jms.listener.AbstractJmsListeningContainer.shutdown(AbstractJmsListeningContainer.java:237)
at
com.testprefetch.PrefetchTest.startAndStopContainer(PrefetchTest.java:43)
at
com.testprefetch.PrefetchTest.testStopDMLCPrefetch0(PrefetchTest.java:15)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
This could be related with method
org.apache.activemq.ActiveMQMessageConsumer#receive(long) :
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we
timeout.
} else {
md = dequeue(timeout);
}
Issue present in versions :
ActiveMQ 5.9.1, 5.10.0
Springframework 3.2.8.RELEASE, 4.0.5.RELEASE
Test Case to reproduce problem:
-----------------------------------------------------------
FILE : com/testprefetch/PrefetchTest.java
package com.testprefetch;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class PrefetchTest {
@Test(timeout = 60 * 1000)
public void testStopDMLCPrefetch0() throws InterruptedException {
startAndStopContainer("/com/testprefetch/testPrefetch_0_Deadlock.appctx.xml");
}
@Test(timeout = 60 * 1000)
public void testStopDMLCPrefetch1() throws InterruptedException {
startAndStopContainer("/com/testprefetch/testPrefetch_1.appctx.xml");
}
@Test(timeout = 60 * 1000)
public void testStopDMLCPrefetch0WithTM() throws InterruptedException {
startAndStopContainer("/com/testprefetch/testPrefetch_0_WithTransactionManager.appctx.xml");
}
public void startAndStopContainer(String applicationContext) throws
InterruptedException {
ClassPathXmlApplicationContext appctx = null;
try {
appctx = new ClassPathXmlApplicationContext(applicationContext);
DefaultMessageListenerContainer container =
(DefaultMessageListenerContainer) appctx.getBean("jmsContainer");
System.out.println("Starting container...");
container.start();
Thread.sleep(2000);
if (container.isActive() && container.isRunning()) {
System.out.println("Container started.");
} else {
fail("Unable to start container.");
}
System.out.println("Stopping container...");
container.shutdown();
Thread.sleep(2000);
// verify if container has been shut down
assertFalse("container should be down", container.isRunning());
assertFalse("container should be down", container.isActive());
} finally {
if (appctx != null) {
appctx.close();
}
}
}
}
-----------------------------------------------------------
FILE : com/testprefetch/SimpleMessageListener.java
package com.testprefetch;
import javax.jms.Message;
import javax.jms.MessageListener;
public class SimpleMessageListener implements MessageListener{
public void onMessage(Message message) {
System.out.println("message received !");
}
}
-----------------------------------------------------------
FILE : com/testprefetch/testPrefetch_0_Deadlock.appctx.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://localhost:61166" >
<amq:prefetchPolicy>
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch"><value>0</value></property>
</bean>
</amq:prefetchPolicy>
</amq:connectionFactory>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="autoStartup" value="false"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destinationName" value="TEST"/>
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="4" />
<property name="maxConcurrentConsumers" value="10" />
<property name="maxMessagesPerTask" value="100" />
</bean>
<bean id="messageListener" class="com.testprefetch.SimpleMessageListener">
</bean>
<!-- broker -->
<amq:broker id="broker" useJmx="false" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://0.0.0.0:61166" />
</amq:transportConnectors>
</amq:broker>
</beans>
-----------------------------------------------------------
FILE : com/testprefetch/testPrefetch_0_WithTransactionManager.appctx.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://localhost:61166" >
<amq:prefetchPolicy>
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch"><value>0</value></property>
</bean>
</amq:prefetchPolicy>
</amq:connectionFactory>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="autoStartup" value="false"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destinationName" value="TEST"/>
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="4" />
<property name="maxConcurrentConsumers" value="10" />
<property name="maxMessagesPerTask" value="100" />
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="transactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="messageListener" class="com.testprefetch.SimpleMessageListener">
</bean>
<!-- broker -->
<amq:broker id="broker" useJmx="false" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://0.0.0.0:61166" />
</amq:transportConnectors>
</amq:broker>
</beans>
-----------------------------------------------------------
FILE : com/testprefetch/testPrefetch_1.appctx.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://localhost:61166" >
<amq:prefetchPolicy>
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch"><value>1</value></property>
</bean>
</amq:prefetchPolicy>
</amq:connectionFactory>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="autoStartup" value="false"/>
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destinationName" value="TEST"/>
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="4" />
<property name="maxConcurrentConsumers" value="10" />
<property name="maxMessagesPerTask" value="100" />
</bean>
<bean id="messageListener" class="com.testprefetch.SimpleMessageListener">
</bean>
<!-- broker -->
<amq:broker id="broker" useJmx="false" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://0.0.0.0:61166" />
</amq:transportConnectors>
</amq:broker>
</beans>
> With prefetch 0, Spring Default Message Listener Shutdown never ends
> --------------------------------------------------------------------
>
> Key: AMQ-5409
> URL: https://issues.apache.org/jira/browse/AMQ-5409
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.9.1, 5.10.0
> Environment: Ubuntu 13.10 64Bits
> Reporter: Wawan
> Attachments: PrefetchTest.java, SimpleMessageListener.java,
> testPrefetch_0_Deadlock.appctx.xml,
> testPrefetch_0_WithTransactionManager.appctx.xml, testPrefetch_1.appctx.xml
>
>
> When prefetch is set to 0, the shutdown of a Spring Default Message Listener
> never ends. We think there is a deadlock.
> This does not occur when prefetch is 1 or when we set a TransactionManager in
> the Spring Default Message Listener.
> DeadLock :
> "jmsContainer-4@2020" prio=5 tid=0x14 nid=NA waiting
> java.lang.Thread.State: WAITING
> at java.lang.Object.wait(Object.java:-1)
> at java.lang.Object.wait(Object.java:485)
> at
> org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:90)
> at
> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:478)
> at
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:629)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
> at java.lang.Thread.run(Thread.java:662)
> "Thread-0@594" prio=5 tid=0xb nid=NA waiting
> java.lang.Thread.State: WAITING
> at java.lang.Object.wait(Object.java:-1)
> at java.lang.Object.wait(Object.java:485)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer.doShutdown(DefaultMessageListenerContainer.java:543)
> at
> org.springframework.jms.listener.AbstractJmsListeningContainer.shutdown(AbstractJmsListeningContainer.java:237)
> at
> com.testprefetch.PrefetchTest.startAndStopContainer(PrefetchTest.java:43)
> at
> com.testprefetch.PrefetchTest.testStopDMLCPrefetch0(PrefetchTest.java:15)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
> This could be related with method
> org.apache.activemq.ActiveMQMessageConsumer#receive(long) :
> if (info.getPrefetchSize() == 0) {
> md = dequeue(-1); // We let the broker let us know when we
> timeout.
> } else {
> md = dequeue(timeout);
> }
> Issue present in versions :
> ActiveMQ 5.9.1, 5.10.0
> Springframework 3.2.8.RELEASE, 4.0.5.RELEASE
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)