Hello all We've run into a possible issue when stopping a connection from a message listener. Our application listens for a "start" message, upon which it does the following in a message listener:
1. It stops the connection 2. It creates another session from the connection 3. It constructs one of our objects, passing it the session 4. The object uses the session to create a few more consumers 5. When the constructor returns, the connection is started again Our object looks something like this: public final class MultipleListener { private int state; private class Listener1 implements MessageListener { public void onMessage(Message message) { state += 1; } } private class Listener2 implements MessageListener { public void onMessage(Message message) { state += 2; } } public MultipleListener(Session session, Destination destination) throws JMSException { MessageConsumer consumer1 = session.createConsumer(destination); consumer1.setMessageListener(new Listener1()); MessageConsumer consumer2 = session.createConsumer(destination); consumer2.setMessageListener(new Listener2()); } } Just to check that I read my JMS spec correctly: since both consumers are created from the same session, we don't have to synchronize access to MultipleListener's state since only one thread from the messaging system will call into the message listeners at any given time. Is this right? And we're doing something like this to create MultipleListeners: Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(...); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new StartListener(connection)); connection.start(); With StartListener something like: class StartListener implements MessageListener { private Connection connection; public StartListener(Connection connection) { this.connection = connection; } public void onMessage(Message message) { connection.stop(); new MultipleListener(connection.createSession(...)); connection.start(); } } The reason I'm stopping the connection prior to making the MultipleListener is that I don't want the message listeners to be called until the constructor is done. The thinking here is that all MultipleListener's internal state might not be ready to receive messages until the constructor has returned. Is this a valid concern? Is there a way to deal with this without stopping the connection? We'd like each MultipleListener to have its own session so that they can process messages in parallel, so plugging them in on the same session as StartListener isn't really what we want to do. We could make this single session work by putting in a queue and having a thread we create pass messages on to the MultipleListeners, but I'm hoping we can get away using only ActiveMQ's Session tasks (or is this an abuse of the session threads?). If we shouldn't be using the session threads to drive our components, is there a nice example somewhere of a good alternative (probably some kind of thread pool)? With all that said, we're seeing some problems when the message listener stops the connection. The threads end up looking like this: Name: ActiveMQ Transport: tcp://localhost/192.168.1.63:61616 State: RUNNABLE Total blocked: 44 Total waited: 0 Stack trace: java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.read(SocketInputStream.java:129) org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:49) org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:56) java.io.DataInputStream.readInt(DataInputStream.java:370) org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156) org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136) java.lang.Thread.run(Thread.java:619) ==== Name: ActiveMQ Scheduler State: TIMED_WAITING on [EMAIL PROTECTED] Total blocked: 0 Total waited: 183 Stack trace: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1927) java.util.concurrent.DelayQueue.take(DelayQueue.java:164) java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:582) java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:575) java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:946) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906) java.lang.Thread.run(Thread.java:619) ========= Name: ActiveMQ Scheduler State: TIMED_WAITING on [EMAIL PROTECTED] Total blocked: 0 Total waited: 203 Stack trace: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1927) java.util.concurrent.DelayQueue.take(DelayQueue.java:164) java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:582) java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:575) java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:946) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906) java.lang.Thread.run(Thread.java:619) ========= Name: ActiveMQ Session Task State: WAITING on [EMAIL PROTECTED] Total blocked: 0 Total waited: 5 Stack trace: java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:485) org.apache.activemq.thread.PooledTaskRunner.shutdown(PooledTaskRunner.java:89) org.apache.activemq.ActiveMQSessionExecutor.stop(ActiveMQSessionExecutor.java:117) org.apache.activemq.ActiveMQSession.stop(ActiveMQSession.java:1468) org.apache.activemq.ActiveMQConnection.stop(ActiveMQConnection.java:495) XXX ... our onMessage handler that stops the connection ... XXX org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:870) - locked [EMAIL PROTECTED] org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99) org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166) org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111) org.apache.activemq.thread.PooledTaskRunner.access$100(PooledTaskRunner.java:26) org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) java.lang.Thread.run(Thread.java:619) ========= Name: ActiveMQ Session Task State: BLOCKED on [EMAIL PROTECTED] owned by: ActiveMQ Session Task Total blocked: 1 Total waited: 0 Stack trace: org.apache.activemq.MessageDispatchChannel.dequeueNoWait(MessageDispatchChannel.java:93) org.apache.activemq.ActiveMQMessageConsumer.iterate(ActiveMQMessageConsumer.java:928) org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:156) org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111) org.apache.activemq.thread.PooledTaskRunner.access$100(PooledTaskRunner.java:26) org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) java.lang.Thread.run(Thread.java:619) ========= I haven't been able to reproduce this problem in a test case yet (seems to depend very much on the timing between "start" messages), but maybe someone can spot something using the stack traces? Are we supposed to be able to stop the connection from inside a message listener or is this pattern broken? If it's broken, how should one deal with this kind of situation where you want to add a bunch of consumers to an already-started connection but only want to enable them once everyone is there (and once you've done some more setup)? We're running the latest ActiveMQ 4.2 from trunk. Thanks for reading. Cheers, Albert