Hello all After hacking up most of our application, I came up with the following simple (I hope?) test case:
public final class SlowConsumerTest { public static void main(final String[] args) throws JMSException, InterruptedException, IOException { class Listener implements MessageListener { public void onMessage(final Message message) { System.out.println("GOT A MESSAGE BEING SLOW"); try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } } final Topic topic = new ActiveMQTopic("topic"); final String brokerURL = "vm://localhost?broker.useJmx=false&broker.persistent=false"; Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session.createConsumer(topic).setMessageListener(new Listener()); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); connection.start(); while (true) { BytesMessage message = new ActiveMQBytesMessage(); try { message.writeBytes(new byte[2048]); producer.send(message); } catch (JMSException e) { throw new RuntimeException(e); } } } } Whether it hangs or not depends seems to depend on the message size and the VM's heap size arguments. With 2048 byte messages with -Xms384m -Xmx512m it hangs reliably within about 7 messages received by the slow consumer on Windows XP SP2 with JDK 1.5.0_10 and 1.6.0 and on Linux with JDK 1.6.0. In cases where it doesn't hang, one typically sees a "out of heap space" error after about 20 or 30 messages, but the messages keep going for some reason. When it hangs, there are 2 or 3 threads running. The main thread's stack: Name: main State: WAITING on [EMAIL PROTECTED] Total blocked: 2 Total waited: 2 Stack trace: java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:485) org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:91) org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:88) org.apache.activemq.broker.region.Topic.send(Topic.java:248) org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:305) org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:381) org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:197) org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126) org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:98) org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:136) org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:449) org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:604) org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:258) org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:164) org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:95) org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65) org.apache.activemq.transport.vm.VMTransport.syncOneWay(VMTransport.java:99) org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:86) org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) - locked [EMAIL PROTECTED] org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165) org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1545) - locked [EMAIL PROTECTED] org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:473) org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:358) SlowConsumerTest.main(SlowConsumerTest.java:41) The ActiveMQ Session task's stack: Name: ActiveMQ Session Task State: BLOCKED on [EMAIL PROTECTED] owned by: main Total blocked: 3 Total waited: 8 Stack trace: org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:43) org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165) org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1647) org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:700) org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:871) - locked [EMAIL PROTECTED] org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99) org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166) org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111) org.apache.activemq.thread.PooledTaskRunner.access$1(PooledTaskRunner.java:95) org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) java.lang.Thread.run(Thread.java:619) The third thread is some kind of timer thread that disappears after a while. We teested with the latest ActiveMQ 4.2 from trunk and 4.2 checked out a few weeks ago -- same problem in both cases. We'd simply like our producer to block waiting for space if the consumers can't keep up -- this is what I expected this code to do (or am I mistaken?). Naturally, if the consumers have a few messages to send, we don't want them to block too, so some kind of per-destination buffer would help here. Should we be doing something differently to achieve this? We don't want to spool to disk, since our data is already being read from a disk, so if the producer dies, it can just restart and resend the data. Thanks for all the help so far. Cheers, Albert On Tue, 23 Jan 2007, Rob Davies wrote: > reproduced - will fix shortly > On 23 Jan 2007, at 07:26, James Strachan wrote: > > >I thought that the new spool-to-disk feature should kick in > >automatically - obviously not :). Anyone know if that has to be > >explicitly enabled? > > > >BTW are you explicitly disabling persistence on ActiveMQ? I think you > >might need to keep it enabled for spooling to disk to work.