[
https://issues.apache.org/jira/browse/AMQ-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14594922#comment-14594922
]
Christopher L. Shannon commented on AMQ-5851:
---------------------------------------------
[~gtully] and [~tabish121],
Sorry in advance for the long message but this is a kind of complicated issue
and was pretty hard to track down.
There is a race condition of sorts and it is caused by the unique circumstances
of this test case and the environment. The short version is that message acks
are being received out of order due to messages being handled concurrently in
different threads. Sometimes this causes an exception because by the time some
of the acks are received by the broker, the broker has already expired the
messages so the acknowledgement check fails. This is happening in Wildfly
because the container is creating a pool of threads to handle multiple messages
at the same time for 1 subscription. This could happen just as easily in other
applications if someone spins off new threads to handle multiple messages in
parallel as well.
What's going on is as a message producer is sending messages to the broker with
a TTL value set at 10 milliseconds, the Wildfly consumer is quickly dequeing
messages in parallel. The run() method in ActiveMQSession is being executed
for each new message in different threads. The first thing that happens is the
message is checked to see if it has expired (around line 885 of
ActiveMQSession). If it's not expired, then the code continues on and the
messageListener (in this case the MDB) eventually runs (around line 1037). The
example that was submitted here has the MDB sleeping for 1 second which is
longer than the TTL of the messages being sent. So a bunch of threads are
sleeping for 1 second and then when they are finished the run() method
continues on and at the end sends a normal acknowledgement back to the broker
in the finally block (around line 1052)
However, as the consumer is rapidly consuming more messages and spinning off
threads, eventually one of the threads runs into a message that is expired
because the time out period is so short. Since the message is expired, it is
acked early and sent to the broker. The MDB never fires when the message is
expired. This triggers the broker to iterate through all of the dispatched
messages and to expire ones that it can. This happens in the
PrefetchSubscription class around line 325 when an expired Ack is received.
So, all of the messages that were dispatched essentially get removed from that
dispatch list since all of the messages are expired. Then, when some of the
threads in Wildfly finish sleeping they continue on and get around to sending
back an ack. Except that by the time this acknowledgement is sent to the
broker, the messages have already been expired because the thread that detected
the expired message already sent an expired ack to the broker so the message
attempting to be acked doesn't exist in the dispatched list.
Below is a printout from my log file with some extra debug statements that I
added. The "handling message" line is a debug statement that I added to print
right after a message is dequeued in the run() method. You can see multiple
threads executing at the same time in Wildfly. "Expired Message" prints out
when a message is detected as being expired. "Sending Ack" prints out when the
thread finishes the normal execution and sends a normal ack back to the broker.
As you can see below, we run into trouble when the thread handling the 4th
message finishes and sends an acknowledgement to the broker. During the period
of time that the thread handling message 4 was sleeping for 1 second, 13 other
messages were handled in parallel. Just before thread 4 finished, one of the
other threads had detected that message 13 expired and sent back an expiration
ack which triggered all of the acks to be expired(including message 4) so by
the time the ack for the 4th message reached the broker that message was
already removed from the dispatch list and we get an exception.
So, as far as solutions go, I'm not sure what the best approach would be. This
situation could happen anytime messages are processed slowly in parallel and
there is a TTL set. Obviously one work around is to not concurrently process
messages off the same subscription but that is pretty common with MDBs and some
other environments. One solution could be to check a second time to see if the
message has expired after the messageListener has been called and send an
expired ack instead of a normal ack in that case. But we'd need to change the
server logic to not fail if it couldn't find the message to ack because it was
already expired. I think another solution could be to keep track of of which
messages have already been expired on a subscription for some window of time so
that so that if an acknowledgement comes later it could be detected as being
for an expired message and no error would be thrown. What do you guys thing?
I can work on a PR and introduce a fix but wanted to discuss what you thought
would be the best approach.
{noformat}
20:20:52,347 INFO [org.apache.activemq.ra.ActiveMQEndpointWorker]
(default-threads - 1) Successfully established connection to broker
[tcp://localhost:61616]
20:21:00,745 INFO [stdout] (default-threads - 2) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:3:1
20:21:00,747 INFO [stdout] (default-threads - 2) Expired Ack (ActiveMQSession)
for: null,ID:localhost.localdomain-53475-1434846046033-2:1:1:3:1
20:21:00,763 INFO [stdout] (default-threads - 3) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1
20:21:00,868 INFO [stdout] (default-threads - 4) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:5:1
20:21:00,980 INFO [stdout] (default-threads - 5) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:6:1
20:21:01,087 INFO [stdout] (default-threads - 6) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:7:1
20:21:01,199 INFO [stdout] (default-threads - 7) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:8:1
20:21:01,315 INFO [stdout] (default-threads - 8) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:9:1
20:21:01,316 INFO [stdout] (default-threads - 8) Expired Ack (ActiveMQSession)
for: null,ID:localhost.localdomain-53475-1434846046033-2:1:1:9:1
20:21:01,423 INFO [stdout] (default-threads - 9) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:10:1
20:21:01,539 INFO [stdout] (default-threads - 10) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:11:1
20:21:01,655 INFO [stdout] (default-threads - 11) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:12:1
20:21:01,768 INFO [stdout] (default-threads - 12) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:13:1
20:21:01,806 INFO [stdout] (default-threads - 12) Expired Ack
(ActiveMQSession) for:
null,ID:localhost.localdomain-53475-1434846046033-2:1:1:13:1
20:21:01,809 INFO [stdout] (default-threads - 3) Hello world!3
20:21:01,810 INFO [stdout] (default-threads - 3) Sending Ack (ActiveMQSession
line 1010) for: 2;
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1,ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1
20:21:01,872 INFO [stdout] (default-threads - 4) Hello world!4
20:21:01,872 INFO [stdout] (default-threads - 4) Sending Ack (ActiveMQSession
line 1010) for: 2;
ID:localhost.localdomain-53475-1434846046033-2:1:1:5:1,ID:localhost.localdomain-53475-1434846046033-2:1:1:5:1
20:21:01,874 INFO [stdout] (default-threads - 13) handling message:
ID:localhost.localdomain-53475-1434846046033-2:1:1:14:1
20:21:01,888 ERROR [org.apache.activemq.ra.ActiveMQEndpointWorker] (ActiveMQ
Connection Executor: tcp://localhost/127.0.0.1:61616@43165) Connection to
broker failed: Unmatched acknowledge: MessageAck {commandId = 16,
responseRequired = false, ackType = 2, consumerId =
ID:localhost.localdomain-46056-1434846051491-1:1:-1:2, firstMessageId =
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1, lastMessageId =
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1, destination =
queue://TEST.FOO, transactionId = null, messageCount = 1, poisonCause = null};
Could not find Message-ID
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1 in dispatched-list
(start of ack): javax.jms.JMSException: Unmatched acknowledge: MessageAck
{commandId = 16, responseRequired = false, ackType = 2, consumerId =
ID:localhost.localdomain-46056-1434846051491-1:1:-1:2, firstMessageId =
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1, lastMessageId =
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1, destination =
queue://TEST.FOO, transactionId = null, messageCount = 1, poisonCause = null};
Could not find Message-ID
ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1 in dispatched-list
(start of ack)
{noformat}
> Unmatched acknowledge: MessageAck {commandId = 77, responseRequired = false,
> ackType = 2, ...Could not find Message-ID XXX in dispatched-list (start of
> ack)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-5851
> URL: https://issues.apache.org/jira/browse/AMQ-5851
> Project: ActiveMQ
> Issue Type: Bug
> Components: JMS client
> Affects Versions: 5.10.0, 5.11.0, 5.11.1
> Reporter: Grijesh Saini
> Labels: ttl
> Attachments: AcknowledgeIssue.zip
>
>
> When lot of messages got expired because of JMS client Time to Live (TTL)
> property then below error will appear and consumer will freeze
> {code:xml}
> Connection to broker failed: Unmatched acknowledge: MessageAck {commandId =
> 77, responseRequired = false, ackType = 2, consumerId =XXX firstMessageId =
> ID:XXX
> lastMessageId = ID:XXX
> , destination = queue://abc, transactionId = null, messageCount = 1,
> poisonCause = null}; Could not find Message-ID in dispatched-list (start of
> ack)
> at
> org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:477)
> [activemq-broker-5.11.1.jar:5.11.1
> at
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:212)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:441)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:484)
> [activemq-broker-5.11.1.jar:5.11.1]
> at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:87)
> [activemq-broker-5.11.1.jar:5.11.1]
> at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:87)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:277)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:97)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:550)
> [activemq-broker-5.11.1.jar:5.11.1]
> at org.apache.activemq.command.MessageAck.visit(MessageAck.java:245)
> [activemq-client-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
> [activemq-broker-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> [activemq-client-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> [activemq-client-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> [activemq-client-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> [activemq-client-5.11.1.jar:5.11.1]
> at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> [activemq-client-5.11.1.jar:5.11.1]
> at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> [activemq-client-5.11.1.jar:5.11.1]
> at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_25]
> {code}
> Steps to reproduce :
> 1. Enable TTL property for JMS client
> 2. Keep TTL value very low say 5 sec
> 3. Send lot of messages so some message will get expired
> 4. Make sure that some message should expired when they are in MDB means
> running inside MDB
> Then we will see above error in the logs
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)