I wonder if there is also clean up thread (ActiveMQ Cleanup Timer) in the mix, not sure about 4.2 but on trunk that is currently a fixed periodic task rather than a fixed delay task and it could end up hogging the jdbc connection.
2009/10/16 Manuel Teira <[email protected]> > Hello. > > We are embedding an activemq 4.2 broker in our application, running on a > Sun JVM 5, and using an Oracle 10g database as persistent messaging store > (without journal, since we are using a JDBC master-slave cluster). > In a pair of occasions, we were unable to get the broker started in our > production environment. Analyzing the situation, it seems that the cause was > the great amount of persistent messages for a given queue (over 75.000), > what was blocking the attempt to create consumers on that destination. > > Trying to reproduce the problem with a more up to date activemq version, we > set up a 5.2.0 standalone broker to resemble our scenario, and a client that > tries to create a set of consumers on the loaded queue. We exported the > table with messages from our production environment and filled the test > scenario database with it. I'm attaching both the activemq configuration and > the client used for testing. > > What we found was: > > The broker started normally. Since no attempt to consume messages from the > "loaded" destination was performed. > Once the client started, we observed that: > - We tried to start 10 consumers on the queue. Only one of those consumers > got the session.createConsumer(), others were blocked (note that all of them > are sharing the same connection) > - The only started consumer was unable to get any message, it was blocked > in the getMessage() attempt. > - The broker seems to be trying to load all the messages from the database, > showing the following stack dump (just the involved threads): > > "ActiveMQ Transport: tcp:///127.0.0.1:33554" daemon prio=4 tid=0x00ac2ef8 > nid=0x3c waiting on condition [0xaae7e000..0xaae7fbf0] > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:681) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:711) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1041) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:184) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:256) > at > org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:217) > at > org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:275) > - locked <0xe35bbb40> (a java.lang.Object) > at > org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:372) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86) > at > org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86) > at > org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93) > at > org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:541) > at > org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:345) > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) > - locked <0xb8a0d5d0> (a > org.apache.activemq.transport.InactivityMonitor$1) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) > at java.lang.Thread.run(Thread.java:595) > > "QueueThread:queue://TaskManagerQueue" daemon prio=10 tid=0x005e1bd8 > nid=0x42 runnable [0xaac7e000..0xaac7faf0] > at oracle.jdbc.driver.T2CStatement.t2cDefineFetch(Native Method) > at > oracle.jdbc.driver.T2CPreparedStatement.doDefineFetch(T2CPreparedStatement.java:827) > at > oracle.jdbc.driver.T2CPreparedStatement.executeForRows(T2CPreparedStatement.java:768) > at > oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1062) > at > oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1132) > at > oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285) > at > oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3329) > - locked <0xe37a09a0> (a oracle.jdbc.driver.T2CPreparedStatement) > - locked <0xe379c398> (a oracle.jdbc.driver.T2CConnection) > at > org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91) > at > org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91) > at > org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709) > at > org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230) > at > org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83) > at > org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75) > at > org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:227) > - locked <0xb8a12f30> (a > org.apache.activemq.broker.region.cursors.QueueStorePrefetch) > at > org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:100) > at > org.apache.activemq.broker.region.cursors.StoreQueueCursor.reset(StoreQueueCursor.java:157) > - locked <0xb8a13648> (a > org.apache.activemq.broker.region.cursors.StoreQueueCursor) > at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1179) > - locked <0xb8a13648> (a > org.apache.activemq.broker.region.cursors.StoreQueueCursor) > at > org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1308) > at org.apache.activemq.broker.region.Queue.iterate(Queue.java:1011) > - locked <0xb8a13748> (a org.apache.activemq.broker.region.Queue$2) > at > org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84) > at > org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675) > at java.lang.Thread.run(Thread.java:595) > > > We waited for 9 or 10 hours without apparent changes: Only one consumer > got, the other threads waiting, and no message consumed. > In the production environment, the only way to recover the situation we > found, was to move message from ACTIVEMQ_MSGS to another table, start the > broker with ACTIVEMQ_MSGS empty, and create an application to deserialize > messages from the table and inject them to the queue using the JMS API. > > We consider that over 10 hours of service unavailability is a serious > problem. The problem seems to be related with the broker trying to get all > the messages from the table at once whenever a consumer for a given > destination is created. Is this a known problem? Is there any way to improve > the situation? > > Best regards. > > > -- > Manuel. > > > package es.tid.planb.test; > > import java.util.concurrent.CountDownLatch; > import java.io.*; > import javax.jms.Connection; > import javax.jms.ConnectionFactory; > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.MessageProducer; > import javax.jms.MessageConsumer; > import javax.jms.Session; > import javax.jms.Queue; > import javax.jms.TemporaryQueue; > import javax.jms.TextMessage; > import javax.jms.Message; > import javax.naming.Context; > import javax.naming.InitialContext; > import javax.naming.NamingException; > import java.util.Date; > import java.util.Properties; > > public class MultithreadConsumer > { > volatile static long consumedCount = 0; > static int recvTimeout = 2000; > > public static void main(String args[]) throws Exception > { > if (args.length < 5) { > System.err.println("Usage: MultithreadConsumer " > + "uri queue threads messages synchro"); > System.exit(255); > } > > Properties jndiProperties = new Properties(); > jndiProperties.setProperty("java.naming.factory.initial", > > "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); > jndiProperties.setProperty("java.naming.provider.url", args[0]); > jndiProperties.setProperty("queue." + args[1], args[1]); > > Context jndiContext = new InitialContext(jndiProperties); > ConnectionFactory cfactory = > (ConnectionFactory) jndiContext.lookup("ConnectionFactory"); > final Destination queue = (Destination) jndiContext.lookup(args[1]); > final Connection conn = cfactory.createConnection(); > final int consumerCount = Integer.parseInt(args[2]); > final int messages = Integer.parseInt(args[3]); > final boolean synchronizeConsumers = > "true".equals(args[4]) ? true : false; > final CountDownLatch consumersPrepared = new > CountDownLatch(consumerCount); > final CountDownLatch consumersGo = new CountDownLatch(1); > > > System.err.println("Consumers: " + consumerCount > + ", Messages: " + messages > + ", Synchronized: " + synchronizeConsumers); > > conn.start(); > Thread consumers[] = new Thread[consumerCount]; > for (int i = 0; i < consumerCount; i++) { > consumers[i] = new Thread(new Runnable() { > public void run() { > Session session = null; > Message msg = null; > try { > session = conn.createSession(true, 0); > MessageConsumer consumer = > session.createConsumer(queue); > System.out.println("Consumer ready"); > if (synchronizeConsumers) { > consumersPrepared.countDown(); > consumersGo.await(); > } > while (consumedCount < messages) { > msg = consumer.receive(recvTimeout); > if (msg != null) { > System.out.print("m"); > session.commit(); > ++consumedCount; > } else { > System.out.print("!"); > } > } > System.out.println("Consumer finishing"); > } catch (Exception e) { > System.err.println("Exception in consumer"); > e.printStackTrace(System.err); > } > if (session != null) { > try { session.close(); } catch (Exception e) {}; > } > } > }, "Consumer#" + i); > } > for (int i = 0; i < consumers.length; ++i) { > consumers[i].start(); > } > > if (synchronizeConsumers) { > System.out.println("Waiting for the consumers to get ready"); > consumersPrepared.await(); > System.out.println("All the consumers ready"); > consumersGo.countDown(); > } > > Date before = new Date(); > > for (int i = 0; i < consumerCount; ++i) { > while (true) { > try { > consumers[i].join(); > break; > } catch (InterruptedException ie) {} > } > } > Date after = new Date(); > long elapsed = after.getTime() - before.getTime(); > System.err.println("Consumers finished. Messages: " > + consumedCount + ", " + elapsed + " ms > elapsed"); > System.err.println((consumedCount * 1000.0 / elapsed) + " > messages/s"); > conn.close(); > } > } > > > -- http://blog.garytully.com Open Source Integration http://fusesource.com
