Jean-Yves Nicolas created AMQ-7359:
--------------------------------------

             Summary: ConcurrentModificationException on client side followed 
by high memory consumption on server side
                 Key: AMQ-7359
                 URL: https://issues.apache.org/jira/browse/AMQ-7359
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.15.3
         Environment: OS: Red Hat Enterprise Linux Server release 7.6 (Maipo)
ActiveMQ standalone version 5.15.3
Producer: J2EE application running on JBoss EAP 7.1

            Reporter: Jean-Yves Nicolas
         Attachments: image-2019-12-06-15-44-40-467.png

Hello,

On our production environment, we use a standalone instance of activemq 5.15.3. 
We observed after a while that the heap consumption increases. We took a memory 
dump to help us to find the cause of the problem.

We saw that one of the queue uses lot of memory. The memory is used by the 
attribute pendingCachedIds (LinkedList) that contains for the example bellow 
1014378 items. We compare this list with other queue. The pendingCachedIds 
attribute is empty for other queue.

See here a screenshot from MAT:
 !image-2019-12-06-15-44-40-467.png!

We then inspected the first element of the list. The futureOrSequenceLong 
attribute of this first item allow us to inspect (at least partially) the xml 
message that was produced and we identify the date of production, as well as 
the message identifier. Note that the class name of the futureOrSequenceLong 
attribute is 
org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask$InnerFutureTask for 
the first Item, but it is java.lang.Long for subsequent items.

Having identify the business message and the get the time when this message was 
produced, we inspect both logs of activemq itself and the producer client 
application (J2EE application running in a JBoss EAP).

We did not see any unusual log in activemq server. We can see the 
sending/preprocess/postproccess/ack events.

On producer side, we can see the folllowing stacktrace:
{code:java}
Caused by: java.lang.RuntimeException: javax.jms.JMSException : 
java.util.concurrent.ExecutionException: 
java.util.ConcurrentModificationException
 at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
 at 
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1399)
 at 
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1428)
 at 
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1323)
 at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1974)
 at 
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:288)
 at 
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:223)
 at 
org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:241)
...
Caused by: java.lang.Exception: java.lang.Throwable : 
java.util.concurrent.ExecutionException: 
java.util.ConcurrentModificationException
 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:881)
 at org.apache.activemq.broker.region.Queue.send(Queue.java:743)
 at 
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:505)
 at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:459)
 at 
org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)
 at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
 at 
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
 at 
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:293)
 at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
 at 
org.apache.activemq.broker.util.LoggingBrokerPlugin.send(LoggingBrokerPlugin.java:275)
 at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
 at 
org.apache.activemq.broker.util.TimeStampingBrokerPlugin.send(TimeStampingBrokerPlugin.java:131)
 at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
 at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
 at 
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:572)
 at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)
 at 
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:330)
 at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:194)
 at 
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
 at 
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
 at 
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
 at 
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
 at 
org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
 at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
 at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: java.util.ConcurrentModificationException
 at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
 at java.util.HashMap$KeyIterator.next(HashMap.java:1466)
 at 
org.apache.activemq.util.MarshallingSupport.marshalPrimitiveMap(MarshallingSupport.java:61)
 at org.apache.activemq.command.Message.beforeMarshall(Message.java:261)
 at 
org.apache.activemq.command.ActiveMQTextMessage.beforeMarshall(ActiveMQTextMessage.java:120)
 at 
org.apache.activemq.openwire.v11.MessageMarshaller.looseMarshal(MessageMarshaller.java:281)
 at 
org.apache.activemq.openwire.v11.ActiveMQMessageMarshaller.looseMarshal(ActiveMQMessageMarshaller.java:111)
 at 
org.apache.activemq.openwire.v11.ActiveMQTextMessageMarshaller.looseMarshal(ActiveMQTextMessageMarshaller.java:111)
 at org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:162)
 at 
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore.addMessage(KahaDBStore.java:453)
 at 
org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask.run(KahaDBStore.java:1428)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624){code}
It's interesting to know that on consumer side, the message was well processed 
(not business inconsistency).

Our activemq instance uses persistence with a quite basic configuration:
{code:java}
<persistenceAdapter>
  <kahaDB directory="/nfs/amq-data"/>
</persistenceAdapter>{code}
Producers and consumers use this transport connector:
{noformat}
<transportConnector name="openwire" 
uri="ssl://0.0.0.0:61616?needClientAuth=true"/>{noformat}
 

We have several interrogations:
 * Why the ConcurrentModificationException is raised? 
 ** What are those modifications that can be triggered simultaneously?
 * Why, after this exception, the size of pendingCachedIds grows?
 * Is there any way, as workaround, to clean the pendingCachedIds without 
stopping ActiveMQ
 ** Our current workaround is to restart the ActiveMQ

About the items in this pendingCachedIds, we can tell that:
 * The first element is not the first element ever enqueued in the queue. We 
are sure that before the raise of the ConcurrentModificationException, the size 
of the list is limited. We did a memory dump of the ActiveMQ a couple of days 
after a restart to validate this assumption.
 * After a quick math, the size of the pendingCachedIds list seems to be the 
equal to the number of messages produced since the raise of the exception.

 

Thanks a lot for your help

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to