Btw, the docs on http://www.activemq.org/site/consumer-dispatch-async.html refer to dispatchAsync but the code uses asyncDispatch. I guess this is an oversight, right ?
On 8/11/06, Hiram Chirino <[EMAIL PROTECTED]> wrote:
I think that if we enable async dispatch this issue should go away. This would only affect vm transport since the transport oneways. We should look into making async to be dispatch be the default when using the vm transport. On 8/11/06, Guillaume Nodet <[EMAIL PROTECTED]> wrote: > I sometime have deadlocks while running junit tests that involve ActiveMQ. > Following is a stack trace i dumped. > As you can see, the two last threads are deadlocked because of the > MutexTransport. > This cause the first thread (main thread) to hang forever. > > Thread [main] (Suspended) > MutexTransport.oneway(Command) line: 44 > ==> MutexTransport (id=292) > ResponseCorrelator.oneway(Command) line: 58 > ManagedTransportConnection(TransportConnection).dispatch(Command) > line: 211 > > ManagedTransportConnection(AbstractConnection).processDispatch(Command) > line: 628 > ManagedTransportConnection(AbstractConnection).dispatchSync(Command) > line: 605 > TopicSubscription.dispatch(MessageReference) line: 315 > TopicSubscription.add(MessageReference) line: 74 > SimpleDispatchPolicy.dispatch(ConnectionContext, MessageReference, > MessageEvaluationContext, List) line: 50 > Topic.dispatch(ConnectionContext, Message) line: 443 > Topic.send(ConnectionContext, Message) line: 254 > ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) > line: 225 > ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) > line: 345 > TransactionBroker.send(ConnectionContext, Message) line: 192 > AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: > 113 > CompositeDestinationBroker.send(ConnectionContext, Message) line: > 97 > BrokerService$2(MutableBrokerFilter).send(ConnectionContext, > Message) line: 126 > > ManagedTransportConnection(AbstractConnection).processMessage(Message) line: > 377 > ActiveMQObjectMessage(ActiveMQMessage).visit(CommandVisitor) line: > 590 > ManagedTransportConnection(AbstractConnection).service(Command) > line: 226 > TransportConnection$1.onCommand(Command) line: 62 > ResponseCorrelator.onCommand(Command) line: 91 > MutexTransport(TransportFilter).onCommand(Command) line: 63 > VMTransportServer$1(VMTransport).oneway(Command) line: 76 > MutexTransport.oneway(Command) line: 44 > => MutexTransport (id=314) > ResponseCorrelator.oneway(Command) line: 58 > ActiveMQConnection.asyncSendPacket(Command) line: 1092 > ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, > Message, int, int, long) line: 1553 > ActiveMQMessageProducer.send(Destination, Message, int, int, long) > line: 462 > ActiveMQMessageProducer.send(Message) line: 356 > JCAFlow.sendJmsMessage(Destination, Serializable, boolean, boolean) > line: 707 > JCAFlow.onInternalEndpointUnregistered(EndpointEvent, boolean) line: > 462 > JCAFlow$1.internalEndpointUnregistered(EndpointEvent) line: 245 > EndpointRegistry.fireEvent(ServiceEndpoint, int) line: 575 > EndpointRegistry.unregisterInternalEndpoint(ComponentContext, > InternalEndpoint) line: 199 > Registry.deactivateEndpoint(ComponentContext, InternalEndpoint) > line: 205 > ComponentContextImpl.deactivateEndpoint(ServiceEndpoint) line: > 157 > ReceiverComponent(PojoSupport).shutDown() line: 103 > ComponentMBeanImpl.doShutDown() line: 335 > ComponentRegistry.shutDown() line: 105 > Registry.shutDown() line: 142 > SpringJBIContainer(JBIContainer).shutDown() line: 601 > SpringJBIContainer.destroy() line: 184 > DisposableBeanAdapter.destroy() line: 97 > DefaultListableBeanFactory(AbstractBeanFactory).destroyBean(String, > Object) line: 1159 > > DefaultListableBeanFactory(AbstractBeanFactory).destroyDisposableBean(String) > line: 1131 > DefaultListableBeanFactory(AbstractBeanFactory).destroySingletons() > line: 660 > ClassPathXmlApplicationContext(AbstractApplicationContext).doClose() > line: 594 > ClassPathXmlApplicationContext(AbstractApplicationContext).close() > line: 563 > ClassPathXmlApplicationContext(AbstractApplicationContext).destroy() > line: 552 > JmsSpringJcaTest(SpringTestSupport).tearDown() line: 66 > JmsSpringJcaTest.tearDown() line: 52 > JmsSpringJcaTest(TestCase).runBare() line: 130 > TestResult$1.protect() line: 106 > TestResult.runProtected(Test, Protectable) line: 124 > TestResult.run(TestCase) line: 109 > JmsSpringJcaTest(TestCase).run(TestResult) line: 118 > TestSuite.runTest(Test, TestResult) line: 208 > TestSuite.run(TestResult) line: 203 > JUnit3TestReference.run(TestExecution) line: 128 > TestExecution.run(ITestReference[]) line: 38 > RemoteTestRunner.runTests(String[], String, TestExecution) line: > 460 > RemoteTestRunner.runTests(TestExecution) line: 673 > RemoteTestRunner.run() line: 386 > RemoteTestRunner.main(String[]) line: 196 > Thread [ActiveMQ Session Task] (Suspended) > MutexTransport.oneway(Command) line: 44 > ==> MutexTransport (id=419) > ResponseCorrelator.oneway(Command) line: 58 > ActiveMQConnection.asyncSendPacket(Command) line: 1092 > ActiveMQSession.<init>(ActiveMQConnection, SessionId, int, boolean, > boolean) line: 227 > ActiveMQConnection.createSession(boolean, int) line: 274 > ServerSessionPoolImpl.createServerSessionImpl() line: 60 > ServerSessionPoolImpl.getServerSession() line: 113 > ActiveMQConnectionConsumer.dispatch(MessageDispatch) line: 135 > ActiveMQConnection.onCommand(Command) line: 1403 > ResponseCorrelator.onCommand(Command) line: 91 > MutexTransport(TransportFilter).onCommand(Command) line: 63 > VMTransport.oneway(Command) line: 76 > MutexTransport.oneway(Command) line: 44 > ==> MutexTransport (id=292) > ResponseCorrelator.oneway(Command) line: 58 > ManagedTransportConnection(TransportConnection).dispatch(Command) > line: 211 > > ManagedTransportConnection(AbstractConnection).processDispatch(Command) > line: 628 > ManagedTransportConnection(AbstractConnection).dispatchSync(Command) > line: 605 > TopicSubscription.dispatch(MessageReference) line: 315 > TopicSubscription.add(MessageReference) line: 74 > SimpleDispatchPolicy.dispatch(ConnectionContext, MessageReference, > MessageEvaluationContext, List) line: 50 > Topic.dispatch(ConnectionContext, Message) line: 443 > Topic.send(ConnectionContext, Message) line: 254 > ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) > line: 225 > ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) > line: 345 > TransactionBroker.send(ConnectionContext, Message) line: 192 > AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: > 113 > CompositeDestinationBroker.send(ConnectionContext, Message) line: > 97 > BrokerService$2(MutableBrokerFilter).send(ConnectionContext, > Message) line: 126 > > ManagedTransportConnection(AbstractConnection).processMessage(Message) line: > 377 > ActiveMQObjectMessage(ActiveMQMessage).visit(CommandVisitor) line: > 590 > ManagedTransportConnection(AbstractConnection).service(Command) > line: 226 > TransportConnection$1.onCommand(Command) line: 62 > ResponseCorrelator.onCommand(Command) line: 91 > MutexTransport(TransportFilter).onCommand(Command) line: 63 > VMTransportServer$1(VMTransport).oneway(Command) line: 76 > MutexTransport.oneway(Command) line: 44 > ==> MutexTransport (id=534) > ResponseCorrelator.oneway(Command) line: 58 > ActiveMQConnection.asyncSendPacket(Command) line: 1092 > ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, > Message, int, int, long) line: 1553 > ActiveMQMessageProducer.send(Destination, Message, int, int, long) > line: 462 > ActiveMQMessageProducer.send(Message) line: 356 > JCAFlow.sendJmsMessage(Destination, Serializable, boolean, boolean) > line: 707 > JCAFlow.onInternalEndpointRegistered(EndpointEvent, boolean) line: > 445 > JCAFlow.onAdvisoryMessage(Object) line: 618 > JCAFlow$4.onMessage(Message) line: 336 > ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 795 > ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 96 > ActiveMQSessionExecutor.iterate() line: 149 > PooledTaskRunner.runTask() line: 110 > PooledTaskRunner.access$100(PooledTaskRunner) line: 25 > PooledTaskRunner$1.run() line: 43 > ThreadPoolExecutor$Worker.runTask(Runnable) line: 650 > ThreadPoolExecutor$Worker.run() line: 675 > Thread.run() line: 595 > Thread [Thread-87] (Suspended) > MutexTransport.oneway(Command) line: 44 > ==> MutexTransport (id=292) > ResponseCorrelator.oneway(Command) line: 58 > ManagedTransportConnection(TransportConnection).dispatch(Command) > line: 211 > TransportConnection$1.onCommand(Command) line: 64 > ResponseCorrelator.onCommand(Command) line: 91 > MutexTransport(TransportFilter).onCommand(Command) line: 63 > VMTransportServer$1(VMTransport).oneway(Command) line: 76 > MutexTransport.oneway(Command) line: 44 > ==> MutexTransport (id=419) > ResponseCorrelator.asyncRequest(Command, ResponseCallback) line: > 66 > ResponseCorrelator.request(Command) line: 71 > ActiveMQConnection.syncSendPacket(Command) line: 1112 > ActiveMQConnectionConsumer.<init>(ActiveMQConnection, > ServerSessionPool, ConsumerInfo) line: 85 > ActiveMQConnection.createConnectionConsumer(Destination, String, > ServerSessionPool, int, boolean) line: 1022 > ActiveMQEndpointWorker$1.run() line: 163 > WorkerContext.run() line: 291 > PooledExecutor$Worker.run() line: not available > Thread.run() line: 595 > > > -- > Cheers, > Guillaume Nodet > > -- Regards, Hiram Blog: http://hiramchirino.com
-- Cheers, Guillaume Nodet