[
https://issues.apache.org/jira/browse/AMQ-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gary Tully resolved AMQ-5854.
-----------------------------
Resolution: Fixed
I have implemented a rollback only option on the transaction context that is
triggered by an failover, the initial disconnect that will trigger a reconnect.
If there are pending acks, so messages that need an ack in the transaction, a
rollback is forced. This seems the only sensible option.
There is a new unit test that simulates a delayed or slow ack, consumer side,
with an additional transaction synchronization and can reliably reproduce the
missed ack.
> Duplicate messages when failover is done during prepare phase of two phase
> commit.
> ----------------------------------------------------------------------------------
>
> Key: AMQ-5854
> URL: https://issues.apache.org/jira/browse/AMQ-5854
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker, JMS client
> Affects Versions: 5.9.1, 5.10.2, 5.11.1
> Environment: Ubuntu or AIX
> ActiveMQ 5.9.1 (problem is reproduced in 5.10.2 and 5.11.1)
> Spring DMLC
> XA transactions with atomikos 3.7.0 (problem is also reproduced with 3.9.15)
> Persistent messages
> Multithreading (this problem occur when there is at least 2 consummers on a
> queue)
> Reporter: Michael
> Assignee: Gary Tully
> Attachments:
> ActiveMQMessageConsumer-5.10.2-ModifyWithThreadSleep.java,
> ActiveMQMessageConsumer-5.11.1-ModifyWithThreadSleep.java, amq5854.tar.gz
>
>
> Use case :
> With Spring DMLC, Read a jms message in a queue, produce a
> jms message in an output queue and write data in database.
> Problem description :
> Due to hight CPU usage, the inactity monitor closes
> connections between clients and broker while 16 messages were processed.
> {noformat}
> 2015-06-01 04:39:01,130 | WARN | Transport Connection to: tcp://*** failed:
> org.apache.activemq.transport.InactivityIOException: Channel was inactive for
> too (>30000) long: tcp://*** |
> org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ
> InactivityMonitor Worker
> {noformat}
> 15 messages are rolled back and redilevered to another
> consummer.
> In the log we got 15 warnings :
> {noformat}
> ActiveMQMessageConsumer |WARN |jmsContainer-173|rolling back transaction
> (XID:***) post failover recovery. 1 previously delivered message(s) not
> replayed to consumer: ***
> {noformat}
> But one message is not rolled back (the transaction commit)
> and is also redileverd to another consummer. So it's processed twice by two
> different consummers (two inserts in database and two output JMS messages
> generated) and is not deduplicated.
> In the activeMq log we got the message :
> {noformat}
> WARN | Async error occurred: |
> org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport:
> tcp:///***
> javax.jms.JMSException: Unmatched acknowledge:
> MessageAck {commandId = 6665, responseRequired = false, ackType = 2,
> consumerId = ID:***, firstMessageId =
> ID:***-50800-1433109620591-1:2:31356:1:1, lastMessageId =
> ID:***-50800-1433109620591-1:2:31356:1:1, destination = queue://***,
> transactionId =
> XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
> messageCount = 1, poisonCause = null}; Could not find Message-ID
> ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)
> {noformat}
> For this duplicated message, the failover occur during
> prepare phase of commit :
> {noformat}
> [{2015/06/01 04:39:50,322 |FailoverTransport |WARN
> |jmsContainer-152|Transport (tcp://***) failed, reason: , attempting to
> automatically reconnect}]
> org.apache.activemq.transport.InactivityIOException: Cannot send, channel has
> already failed: ***
> at
> org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
> at
> org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
> at
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
> at
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
> at
> org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
> at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> at
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> at
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
> at
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
> at
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
> at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
> at
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
> at
> org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
> at
> org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
> at
> org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
> at
> org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
> at
> com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
> at
> com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
> at
> com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
> at
> com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
> at
> com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
> at
> com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
> at
> com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
> at
> com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
> at
> com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
> at
> com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
> at
> com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
> at
> com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
> at
> com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
> at
> org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
> at
> org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
> at
> org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
> at java.lang.Thread.run(Thread.java:761)
> {noformat}
> Our analysis :
> We think that the duplicate message is caused by the failover
> during the prepare phase of the commit so we modify the source code to
> reproduce the case.
> Our modifications in config to produce failovers:
> broker : transport.useKeepAlive=false
> client : wireFormat.maxInactivityDuration=5000
> We add Thread.sleep in the source code of
> org.apache.activemq.ActiveMQMessageConsumer to force failover to be done
> exactly where we think it causes problems :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#acknowledge()|borderStyle=solid}
>
> public void acknowledge() throws JMSException {
> clearDeliveredList();
> waitForRedeliveries();
> synchronized(deliveredMessages) {
> // BEGIN MODIFIED CODE
> LOG.warn("start sleeping 20 seconds to
> test failover");
> try{
> Thread.sleep(1000 * 20 );
> }catch (InterruptedException e){
> LOG.error("Exception :",e);
> }
> LOG.warn("end sleeping 20 seconds to test
> failover");
> // END MODIFIED CODE
> // Acknowledge all messages so far.
> MessageAck ack =
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack == null)
> return; // no msgs
> if (session.getTransacted()) {
> rollbackOnFailedRecoveryRedelivery();
> session.doStartTransaction();
>
> ack.setTransactionId(session.getTransactionContext().getTransactionId());
> }
> pendingAck = null;
> session.sendAck(ack);
> // Adjust the counters
> deliveredCounter = Math.max(0,
> deliveredCounter - deliveredMessages.size());
> additionalWindowSize = Math.max(0,
> additionalWindowSize - deliveredMessages.size());
> if (!session.getTransacted()) {
> deliveredMessages.clear();
> }
> }
> }
> {code}
>
> With these changes on the configuration and the code, the
> problem is easily reproduced.
> We also try with transactedIndividualAck=true, and we add a
> Thread.sleep in the code :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#registerSync()|borderStyle=solid}
>
> private void registerSync() throws JMSException {
> session.doStartTransaction();
> if (!synchronizationRegistered) {
> synchronizationRegistered = true;
>
> session.getTransactionContext().addSynchronization(new Synchronization() {
> @Override
> public void beforeEnd() throws
> Exception {
> if (transactedIndividualAck) {
> clearDeliveredList();
> waitForRedeliveries();
>
> synchronized(deliveredMessages) {
>
> // BEGIN MODIFIED CODE
> LOG.warn("start sleeping
> 20 seconds to test failover");
> try{
> Thread.sleep(1000 * 20
> );
> }catch
> (InterruptedException e){
> LOG.error("Exception
> :",e);
> }
> LOG.warn("end sleeping 20
> seconds to test failover");
> // END MODIFIED CODE
>
>
> rollbackOnFailedRecoveryRedelivery();
> }
> } else {
> acknowledge();
> }
> synchronizationRegistered = false;
> }
> @Override
> public void afterCommit() throws
> Exception {
> commit();
> synchronizationRegistered = false;
> }
> @Override
> public void afterRollback() throws
> Exception {
> rollback();
> synchronizationRegistered = false;
> }
> });
> }
> }
> {code}
> With these modifications, we still get duplicates messages.
> We think that the problem is that the statement
> synchronized(deliveredMessages) prevents the call of clearDeliveredList() by
> another ActiveMQConnection thread that clears messages in progress.
> By adding logs we observe that a thread is waiting
> deliveredMessages ‘s lock in clearDeliveredList() method.
>
> Question :
>
> We tried fixes described in
> https://issues.apache.org/jira/browse/AMQ-5068 and
> https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve
> our problem.
> Is there a workaround or a config parameter that can help to
> prevent this problem ?
>
> We are working on our side to find a correction. An option
> may be to force rolling back transaction if there is a failover during the
> prepare phase of commit in ConnectionStateTracker.restoreTransactions().
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)