Hello all

After hacking up most of our application, I came up with the following 
simple (I hope?) test case:

public final class SlowConsumerTest {
    public static void main(final String[] args) throws JMSException, 
InterruptedException, IOException {
        class Listener implements MessageListener {
            public void onMessage(final Message message) {
                System.out.println("GOT A MESSAGE BEING SLOW");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        final Topic topic = new ActiveMQTopic("topic");
        final String brokerURL = 
"vm://localhost?broker.useJmx=false&broker.persistent=false";
        Connection connection = new 
ActiveMQConnectionFactory(brokerURL).createConnection();
        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
        session.createConsumer(topic).setMessageListener(new Listener());
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        connection.start();
        while (true) {
            BytesMessage message = new ActiveMQBytesMessage();
            try {
                message.writeBytes(new byte[2048]);
                producer.send(message);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Whether it hangs or not depends seems to depend on the message size and 
the VM's heap size arguments.

With 2048 byte messages with -Xms384m -Xmx512m it hangs reliably within 
about 7 messages received by the slow consumer on Windows XP SP2 with 
JDK 1.5.0_10 and 1.6.0 and on Linux with JDK 1.6.0.

In cases where it doesn't hang, one typically sees a "out of heap space" 
error after about 20 or 30 messages, but the messages keep going for 
some reason.

When it hangs, there are 2 or 3 threads running.

The main thread's stack:

Name: main
State: WAITING on [EMAIL PROTECTED]
Total blocked: 2  Total waited: 2

Stack trace: 
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:91)
org.apache.activemq.memory.UsageManager.waitForSpace(UsageManager.java:88)
org.apache.activemq.broker.region.Topic.send(Topic.java:248)
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:305)
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:381)
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:197)
org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:98)
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:136)
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:449)
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:604)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:258)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:164)
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:95)
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)
org.apache.activemq.transport.vm.VMTransport.syncOneWay(VMTransport.java:99)
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:86)
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
   - locked [EMAIL PROTECTED]
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165)
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1545)
   - locked [EMAIL PROTECTED]
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:473)
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:358)
SlowConsumerTest.main(SlowConsumerTest.java:41)

The ActiveMQ Session task's stack:

Name: ActiveMQ Session Task
State: BLOCKED on [EMAIL PROTECTED] owned by: main
Total blocked: 3  Total waited: 8

Stack trace: 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:43)
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1165)
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1647)
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:700)
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:871)
   - locked [EMAIL PROTECTED]
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:99)
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:166)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:111)
org.apache.activemq.thread.PooledTaskRunner.access$1(PooledTaskRunner.java:95)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:44)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)

The third thread is some kind of timer thread that disappears after a 
while.

We teested with the latest ActiveMQ 4.2 from trunk and 4.2 checked out 
a few weeks ago -- same problem in both cases.

We'd simply like our producer to block waiting for space if the 
consumers can't keep up -- this is what I expected this code to do (or 
am I mistaken?). Naturally, if the consumers have a few messages to 
send, we don't want them to block too, so some kind of per-destination 
buffer would help here.

Should we be doing something differently to achieve this? We don't want 
to spool to disk, since our data is already being read from a disk, so 
if the producer dies, it can just restart and resend the data.

Thanks for all the help so far.

Cheers,

Albert

On Tue, 23 Jan 2007, Rob Davies wrote:

> reproduced - will fix shortly
> On 23 Jan 2007, at 07:26, James Strachan wrote:
> 
> >I thought that the new spool-to-disk feature should kick in
> >automatically - obviously not :). Anyone know if that has to be
> >explicitly enabled?
> >
> >BTW are you explicitly disabling persistence on ActiveMQ? I think you
> >might need to keep it enabled for spooling to disk to work.

Reply via email to