Hello all

On Thu, 18 Jan 2007, James.Strachan wrote:

> Have you tried setting the UsageManager configuration to something large? Out
> of the box its set to something really tiny.
> 
> BTW if you have an issue, please mention the version of the software you are
> using.
> http://incubator.apache.org/activemq/support.html
> James

Some more details from the day's debugging.

We're running ActiveMQ 4.2 from trunk as checked out on Friday.

Our test has a thread that produces messages. On another connection in 
the same application, we have a consumer with a message listener set. 
This listener produces a new message.

Debugging with Eclipse, I see that the first time the code goes into 
UsageManager.waitForSpace, it is calling from the thread that produces. 
The stack looks like this:

Thread [myapp$MyThread] (Suspended (breakpoint at line 91 in UsageManager))     
        UsageManager.waitForSpace() line: 91    
        UsageManager.waitForSpace() line: 88    
        Topic.send(ConnectionContext, Message) line: 248        
        ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) 
line: 305   
        ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) 
line: 381    
        TransactionBroker.send(ConnectionContext, Message) line: 197    
        AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126 
        CompositeDestinationBroker.send(ConnectionContext, Message) line: 98    
        BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) 
line: 136 
        TransportConnection.processMessage(Message) line: 449   
        ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604   
        TransportConnection.service(Command) line: 258  
        TransportConnection$1.onCommand(Object) line: 164       
        ResponseCorrelator.onCommand(Object) line: 95   
        MutexTransport(TransportFilter).onCommand(Object) line: 65      
        VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99    
        VMTransportServer$1(VMTransport).oneway(Object) line: 86        
        MutexTransport.oneway(Object) line: 44  
        ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69      
        ResponseCorrelator.request(Object) line: 74     
        ActiveMQConnection.syncSendPacket(Command) line: 1185   
        ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, 
Message, int, int, long) line: 1547  
        ActiveMQMessageProducer.send(Destination, Message, int, int, long) 
line: 473    
        ActiveMQMessageProducer.send(Message) line: 358 
        <thread in my application calling send>

If I then allow the VM to continue, it runs for a short while and then 
the single remaining ActiveMQ Session Task thread blocks:

Thread [ActiveMQ Session Task] (Suspended (breakpoint at line 91 in 
UsageManager))      
        UsageManager.waitForSpace() line: 91    
        UsageManager.waitForSpace() line: 88    
        Topic.send(ConnectionContext, Message) line: 248        
        ManagedTopicRegion(AbstractRegion).send(ConnectionContext, Message) 
line: 305   
        ManagedRegionBroker(RegionBroker).send(ConnectionContext, Message) 
line: 381    
        TransactionBroker.send(ConnectionContext, Message) line: 197    
        AdvisoryBroker(BrokerFilter).send(ConnectionContext, Message) line: 126 
        CompositeDestinationBroker.send(ConnectionContext, Message) line: 98    
        BrokerService$2(MutableBrokerFilter).send(ConnectionContext, Message) 
line: 136 
        TransportConnection.processMessage(Message) line: 449   
        ActiveMQBytesMessage(ActiveMQMessage).visit(CommandVisitor) line: 604   
        TransportConnection.service(Command) line: 258  
        TransportConnection$1.onCommand(Object) line: 164       
        ResponseCorrelator.onCommand(Object) line: 95   
        MutexTransport(TransportFilter).onCommand(Object) line: 65      
        VMTransportServer$1(VMTransport).syncOneWay(Object) line: 99    
        VMTransportServer$1(VMTransport).oneway(Object) line: 86        
        MutexTransport.oneway(Object) line: 44  
        ResponseCorrelator.asyncRequest(Object, ResponseCallback) line: 69      
        ResponseCorrelator.request(Object) line: 74     
        ActiveMQConnection.syncSendPacket(Command) line: 1185   
        ActiveMQSession.send(ActiveMQMessageProducer, ActiveMQDestination, 
Message, int, int, long) line: 1547  
        ActiveMQMessageProducer.send(Destination, Message, int, int, long) 
line: 473    
        ActiveMQMessageProducer.send(Message) line: 358 
        
        <onMessage of listener in my application is called. it calls send.>
        
        ActiveMQMessageConsumer.dispatch(MessageDispatch) line: 870     
        ActiveMQSessionExecutor.dispatch(MessageDispatch) line: 99      
        ActiveMQSessionExecutor.iterate() line: 166     
        PooledTaskRunner.runTask() line: 111    
        PooledTaskRunner.access$1(PooledTaskRunner) line: 95    
        PooledTaskRunner$1.run() line: 44       
        ThreadPoolExecutor$Worker.runTask(Runnable) line: 885   
        ThreadPoolExecutor$Worker.run() line: 907       
        Thread.run() line: 619  

There doesn't seem to be any other ActiveMQ threads around, so I can't 
see how this thread is going to wake up from its

usageMutex.wait();

that is blocking on.

Any thoughts? Is it wrong to produce straight from a message listener?

By the way, I noticed that increasing the memory available to the 
memory manager from 20 MB to 128 MB actually makes the application block 
sooner rather than later. I also experimented with some prefetch policy 
settings, but these don't make much of a difference. Currently, we're 
simply using

vm://localhost?broker.useJmx=true&broker.persistent=false

as our broker URL.

We're using topics throughout. Sessions' acknowledge modes are all set 
to DUPS_OK_ACKNOWLEDGE. We're doing some message selection with string 
and integer properties.

Cheers,

Albert

Reply via email to