FWIW I"d definitely recommend you use non-persistent sending on your
producers (otherwise the spool to disk won't kick in). Also are you
actually ack'ing your messages?

On 1/21/07, Albert Strasheim <[EMAIL PROTECTED]> wrote:
Hello all

On Thu, 18 Jan 2007, James.Strachan wrote:

> Have you tried setting the UsageManager configuration to something large? Out
> of the box its set to something really tiny.
>
> BTW if you have an issue, please mention the version of the software you are
> using.
> http://incubator.apache.org/activemq/support.html
> James

Some more details from the day's debugging.

We're running ActiveMQ 4.2 from trunk as checked out on Friday.

Our test has a thread that produces messages. On another connection in
the same application, we have a consumer with a message listener set.
This listener produces a new message.

Debugging with Eclipse, I see that the first time the code goes into
UsageManager.waitForSpace, it is calling from the thread that produces.
The stack looks like this:

Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in UsageManager))
        UsageManager.waitForSpace() line: 91
        UsageManager.waitForSpace() line: 88
        Topic.send(ConnectionContext, Message) line: 248
        ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) 
line: 305
        ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) 
line: 381
        TransactionBroker.send(ConnectionContext, Message) line: 197
        AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126
        CompositeDestinationBroker.send(ConnectionContext, Message) line: 98
        BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) 
line: 136
        TransportConnection.processMessage(Message) line: 449
        ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604
        TransportConnection.service(Command) line: 258
        TransportConnection$1.onCommand(Object) line: 164
        ResponseCorrelator.onCommand(Object) line: 95
        MutexTransport(TransportFilter).onCommand(Object) line: 65
        VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99
        VMTransportServer$1(VMTransport).oneway(Object) line: 86
        MutexTransport.oneway(Object) line: 44
        ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69
        ResponseCorrelator.request(Object) line: 74
        ActiveMQConnection.syncSendPacket(Command) line: 1185
        ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, 
Message, int, int, long) line: 1547
        ActiveMQMessageProducer.send(Destination, Message, int, int, long) 
line: 473
        ActiveMQMessageProducer.send(Message) line: 358
        <thread in my application calling send>

If I then allow the VM to continue, it runs for a short while and then
the single remaining ActiveMQ Session Task thread blocks:

Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in 
UsageManager))
        UsageManager.waitForSpace() line: 91
        UsageManager.waitForSpace() line: 88
        Topic.send(ConnectionContext, Message) line: 248
        ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) 
line: 305
        ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) 
line: 381
        TransactionBroker.send(ConnectionContext, Message) line: 197
        AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126
        CompositeDestinationBroker.send(ConnectionContext, Message) line: 98
        BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) 
line: 136
        TransportConnection.processMessage(Message) line: 449
        ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604
        TransportConnection.service(Command) line: 258
        TransportConnection$1.onCommand(Object) line: 164
        ResponseCorrelator.onCommand(Object) line: 95
        MutexTransport(TransportFilter).onCommand(Object) line: 65
        VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99
        VMTransportServer$1(VMTransport).oneway(Object) line: 86
        MutexTransport.oneway(Object) line: 44
        ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69
        ResponseCorrelator.request(Object) line: 74
        ActiveMQConnection.syncSendPacket(Command) line: 1185
        ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, 
Message, int, int, long) line: 1547
        ActiveMQMessageProducer.send(Destination, Message, int, int, long) 
line: 473
        ActiveMQMessageProducer.send(Message) line: 358

        <onMessage of listener in my application is called. it calls send.>

        ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870
        ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99
        ActiveMQSessionExecutor.iterate() line: 166
        PooledTaskRunner.runTask() line: 111
        PooledTaskRunner.access$1(PooledTaskRunner) line: 95
        PooledTaskRunner$1.run() line: 44
        ThreadPoolExecutor$Worker.runTask(Runnable) line: 885
        ThreadPoolExecutor$Worker.run() line: 907
        Thread.run() line: 619

There doesn't seem to be any other ActiveMQ threads around, so I can't
see how this thread is going to wake up from its

usageMutex.wait();

that is blocking on.

Any thoughts? Is it wrong to produce straight from a message listener?

By the way, I noticed that increasing the memory available to the
memory manager from 20 MB to 128 MB actually makes the application block
sooner rather than later. I also experimented with some prefetch policy
settings, but these don't make much of a difference. Currently, we're
simply using

vm://localhost?broker.useJmx=true&broker.persistent=false

as our broker URL.

We're using topics throughout. Sessions' acknowledge modes are all set
to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string
and integer properties.

Cheers,

Albert



--

James
-------
http://radio.weblogs.com/0112098/

Reply via email to