Marcus Bergner created AMQ-7188:
-----------------------------------
Summary: ConcurrentModificationException during message
dispatching seems to lead to messages being lost
Key: AMQ-7188
URL: https://issues.apache.org/jira/browse/AMQ-7188
Project: ActiveMQ
Issue Type: Bug
Components: activemq-camel, Broker
Affects Versions: 5.15.8
Environment: ActiveMQ 5.15.8 (conf/activemq.xml attached)
KahaDB
STOMP text messages (xml bodies)
Messages *sent to topics and routed to multiple queues* by Camel and then
consumed from those queues. Typical routing rules look like this:
{noformat}
<route id="routeABC">
<from uri="activemq:topic:someTopic"/>
<filter>
<xpath>
$SomeHeader = 'X' or
$SomeHeader = 'Y'
</xpath>
<to uri="activemq:queue:queueABC?jmsKeyFormatStrategy=passthrough"/>
</filter>
</route>
<route id="routeXYZ">
<from uri="activemq:topic:someTopic"/>
<filter>
<xpath>
$SomeHeader = 'X'
</xpath>
<to uri="activemq:queue:queueXYZ?jmsKeyFormatStrategy=passthrough"/>
</filter>
</route>
{noformat}
Reporter: Marcus Bergner
Attachments: activemq.xml
I have been trying to figure out a somewhat difficult to reproduce/pinpoint
issue where it seems like messages are not being delivered to queue consumers
properly. The best clue I have at this point is that we occasionally see
exceptions like this and I can get such exceptions to appear relatively easily
by running a bit more serious activities through our system.
{noformat}
2019-04-25 14:04:00,419 | DEBUG | Async client internal exception occurred with
no exception listener registered: java.util.ConcurrentModificationException |
org.apache.activemq.ActiveMQConnection | ActiveMQ VMTransport:
vm://localhost#108
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
at java.util.HashMap.putMapEntries(HashMap.java:512)
at java.util.HashMap.<init>(HashMap.java:490)
at org.apache.activemq.command.Message.copy(Message.java:160)
at
org.apache.activemq.command.ActiveMQMessage.copy(ActiveMQMessage.java:69)
at
org.apache.activemq.command.ActiveMQTextMessage.copy(ActiveMQTextMessage.java:58)
at
org.apache.activemq.command.ActiveMQTextMessage.copy(ActiveMQTextMessage.java:53)
at
org.apache.activemq.ActiveMQConnection$3.processMessageDispatch(ActiveMQConnection.java:1840)
at
org.apache.activemq.command.MessageDispatch.visit(MessageDispatch.java:113)
at
org.apache.activemq.ActiveMQConnection.onCommand(ActiveMQConnection.java:1828)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
at
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:275)
at
org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)
at
org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)
{noformat}
I found vaguely similar issues AMQ-4092, AMQ-5664, and AMQ-5934. Those along
with
https://stackoverflow.com/questions/12644272/activemq-message-groups-concurrentmodificationexception
also indicate that messages are being dispatched concurrently and that there
is a potential workaround with a KahaDB setting
{{concurrentStoreAndDispatchQueues ="false"}} which according to what I've read
has potentially serious throughput implications.
I looked a bit at the ActiveMQ code from git (the activemq-5.15.x branch) and
at a glance based on the above stack trace and the way the current code looks
it to me looks like maybe the incoming {{ActiveMQTextMessage}} instance is
handed to N concurrent dispatchers, which then tries to {{copy}} the message
but that this is suffering from some race condition.
Looking at the {{Message}} base class the exception happens on line 160\\
{{copy.properties = new HashMap<String, Object>(properties);}}\\
* Should it be using ConcurrentHashMap perhaps?
* Is there something in the parent thread that delegates message dispatching to
other threads that can end up modifying or maybe clearing the source object and
doing so before all dispatchers are at least done copying data? I did not dig
deep enough to figure out the details of how this dispatching is done and what
degree of synchronization exists there.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)