FWIW I"d definitely recommend you use non-persistent sending on your producers (otherwise the spool to disk won't kick in). Also are you actually ack'ing your messages?
On 1/21/07, Albert Strasheim <[EMAIL PROTECTED]> wrote:
Hello all On Thu, 18 Jan 2007, James.Strachan wrote: > Have you tried setting the UsageManager configuration to something large? Out > of the box its set to something really tiny. > > BTW if you have an issue, please mention the version of the software you are > using. > http://incubator.apache.org/activemq/support.html > James Some more details from the day's debugging. We're running ActiveMQ 4.2 from trunk as checked out on Friday. Our test has a thread that produces messages. On another connection in the same application, we have a consumer with a message listener set. This listener produces a new message. Debugging with Eclipse, I see that the first time the code goes into UsageManager.waitForSpace, it is calling from the thread that produces. The stack looks like this: Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in UsageManager)) UsageManager.waitForSpace() line: 91 UsageManager.waitForSpace() line: 88 Topic.send(ConnectionContext, Message) line: 248 ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) line: 305 ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) line: 381 TransactionBroker.send(ConnectionContext, Message) line: 197 AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126 CompositeDestinationBroker.send(ConnectionContext, Message) line: 98 BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) line: 136 TransportConnection.processMessage(Message) line: 449 ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604 TransportConnection.service(Command) line: 258 TransportConnection$1.onCommand(Object) line: 164 ResponseCorrelator.onCommand(Object) line: 95 MutexTransport(TransportFilter).onCommand(Object) line: 65 VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99 VMTransportServer$1(VMTransport).oneway(Object) line: 86 MutexTransport.oneway(Object) line: 44 ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69 ResponseCorrelator.request(Object) line: 74 ActiveMQConnection.syncSendPacket(Command) line: 1185 ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, Message, int, int, long) line: 1547 ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 473 ActiveMQMessageProducer.send(Message) line: 358 <thread in my application calling send> If I then allow the VM to continue, it runs for a short while and then the single remaining ActiveMQ Session Task thread blocks: Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in UsageManager)) UsageManager.waitForSpace() line: 91 UsageManager.waitForSpace() line: 88 Topic.send(ConnectionContext, Message) line: 248 ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) line: 305 ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) line: 381 TransactionBroker.send(ConnectionContext, Message) line: 197 AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126 CompositeDestinationBroker.send(ConnectionContext, Message) line: 98 BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) line: 136 TransportConnection.processMessage(Message) line: 449 ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604 TransportConnection.service(Command) line: 258 TransportConnection$1.onCommand(Object) line: 164 ResponseCorrelator.onCommand(Object) line: 95 MutexTransport(TransportFilter).onCommand(Object) line: 65 VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99 VMTransportServer$1(VMTransport).oneway(Object) line: 86 MutexTransport.oneway(Object) line: 44 ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69 ResponseCorrelator.request(Object) line: 74 ActiveMQConnection.syncSendPacket(Command) line: 1185 ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, Message, int, int, long) line: 1547 ActiveMQMessageProducer.send(Destination, Message, int, int, long) line: 473 ActiveMQMessageProducer.send(Message) line: 358 <onMessage of listener in my application is called. it calls send.> ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870 ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99 ActiveMQSessionExecutor.iterate() line: 166 PooledTaskRunner.runTask() line: 111 PooledTaskRunner.access$1(PooledTaskRunner) line: 95 PooledTaskRunner$1.run() line: 44 ThreadPoolExecutor$Worker.runTask(Runnable) line: 885 ThreadPoolExecutor$Worker.run() line: 907 Thread.run() line: 619 There doesn't seem to be any other ActiveMQ threads around, so I can't see how this thread is going to wake up from its usageMutex.wait(); that is blocking on. Any thoughts? Is it wrong to produce straight from a message listener? By the way, I noticed that increasing the memory available to the memory manager from 20 MB to 128 MB actually makes the application block sooner rather than later. I also experimented with some prefetch policy settings, but these don't make much of a difference. Currently, we're simply using vm://localhost?broker.useJmx=true&broker.persistent=false as our broker URL. We're using topics throughout. Sessions' acknowledge modes are all set to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string and integer properties. Cheers, Albert
-- James ------- http://radio.weblogs.com/0112098/