[ 
https://issues.apache.org/jira/browse/AMQ-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Tully resolved AMQ-5854.
-----------------------------
    Resolution: Fixed

I have implemented a rollback only option on the transaction context that is 
triggered by an failover, the initial disconnect that will trigger a reconnect.
If there are pending acks, so messages that need an ack in the transaction, a 
rollback is forced. This seems the only sensible option.
There is a new unit test that simulates a delayed or slow ack, consumer side, 
with an additional transaction synchronization and can reliably reproduce the 
missed ack.

> Duplicate messages when failover is done during prepare phase of two phase 
> commit.
> ----------------------------------------------------------------------------------
>
>                 Key: AMQ-5854
>                 URL: https://issues.apache.org/jira/browse/AMQ-5854
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, JMS client
>    Affects Versions: 5.9.1, 5.10.2, 5.11.1
>         Environment: Ubuntu or AIX
> ActiveMQ 5.9.1 (problem is reproduced in 5.10.2 and 5.11.1)
> Spring DMLC
> XA transactions with atomikos 3.7.0 (problem is also reproduced with 3.9.15)
> Persistent messages
> Multithreading (this problem occur when there is at least 2 consummers on a 
> queue)
>            Reporter: Michael
>            Assignee: Gary Tully
>         Attachments: 
> ActiveMQMessageConsumer-5.10.2-ModifyWithThreadSleep.java, 
> ActiveMQMessageConsumer-5.11.1-ModifyWithThreadSleep.java, amq5854.tar.gz
>
>
> Use case :
>                 With Spring DMLC, Read a jms message in a queue, produce a 
> jms message in an output queue and write data in database.
> Problem description :
>                 Due to hight CPU usage, the inactity monitor closes 
> connections between clients and broker while 16 messages were processed.
> {noformat}
> 2015-06-01 04:39:01,130 | WARN  | Transport Connection to: tcp://*** failed: 
> org.apache.activemq.transport.InactivityIOException: Channel was inactive for 
> too (>30000) long: tcp://*** | 
> org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ 
> InactivityMonitor Worker
> {noformat}
>                 15 messages are rolled back and redilevered to another 
> consummer.
>                 In the log we got 15 warnings :
> {noformat}
> ActiveMQMessageConsumer   |WARN |jmsContainer-173|rolling back transaction 
> (XID:***) post failover recovery. 1 previously delivered message(s) not 
> replayed to consumer: ***
> {noformat}
>                 But one message is not rolled back (the transaction commit) 
> and is also redileverd to another consummer. So it's processed twice by two 
> different consummers (two inserts in database and two output JMS messages 
> generated) and is not deduplicated.
>                 In the activeMq log we got the message :
> {noformat}
> WARN  | Async error occurred:  | 
> org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: 
> tcp:///***
>                        javax.jms.JMSException: Unmatched acknowledge: 
> MessageAck {commandId = 6665, responseRequired = false, ackType = 2, 
> consumerId = ID:***, firstMessageId = 
> ID:***-50800-1433109620591-1:2:31356:1:1, lastMessageId = 
> ID:***-50800-1433109620591-1:2:31356:1:1, destination = queue://***, 
> transactionId = 
> XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
>  messageCount = 1, poisonCause = null}; Could not find Message-ID 
> ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)
> {noformat}
>                 For this duplicated message, the failover occur during 
> prepare phase of commit :
> {noformat}
> [{2015/06/01 04:39:50,322 |FailoverTransport         |WARN 
> |jmsContainer-152|Transport (tcp://***) failed, reason:  , attempting to 
> automatically reconnect}]
> org.apache.activemq.transport.InactivityIOException: Cannot send, channel has 
> already failed: ***
>                 at 
> org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
>                 at 
> org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
>                 at 
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
>                 at 
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
>                 at 
> org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
>                 at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>                 at 
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>                 at 
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
>                 at 
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
>                 at 
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
>                 at 
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
>                 at 
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
>                 at 
> org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
>                 at 
> org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
>                 at 
> org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
>                 at 
> org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
>                 at 
> com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
>                 at 
> com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
>                 at 
> com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
>                 at 
> com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
>                 at 
> com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
>                 at 
> com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
>                 at 
> com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
>                 at 
> com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
>                 at 
> com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
>                 at 
> com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
>                 at 
> com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
>                 at 
> com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
>                 at 
> com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
>                 at 
> org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
>                 at 
> org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
>                 at 
> org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
>                 at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
>                 at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
>                 at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
>                 at java.lang.Thread.run(Thread.java:761)
> {noformat}
> Our analysis :
>                 We think that the duplicate message is caused by the failover 
> during the prepare phase of the commit so we modify the source code to 
> reproduce the case.
>                 Our modifications in config to produce failovers:
>                                broker : transport.useKeepAlive=false
>                                client : wireFormat.maxInactivityDuration=5000
>                 We add Thread.sleep in the source code of 
> org.apache.activemq.ActiveMQMessageConsumer to force failover to be done 
> exactly where we think it causes problems :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#acknowledge()|borderStyle=solid}
>                 
>                     public void acknowledge() throws JMSException {
>                                clearDeliveredList();
>                                waitForRedeliveries();
>                                synchronized(deliveredMessages) {
>                                    // BEGIN MODIFIED CODE
>                                    LOG.warn("start sleeping 20 seconds to 
> test failover");
>                                    try{
>                                        Thread.sleep(1000 * 20 );
>                                    }catch (InterruptedException e){
>                                        LOG.error("Exception :",e);
>                                    }
>                                    LOG.warn("end sleeping 20 seconds to test 
> failover");
>                                    // END MODIFIED CODE
>                                    // Acknowledge all messages so far.
>                                    MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                    if (ack == null)
>                                        return; // no msgs
>                                    if (session.getTransacted()) {
>                                        rollbackOnFailedRecoveryRedelivery();
>                                        session.doStartTransaction();
>                                        
> ack.setTransactionId(session.getTransactionContext().getTransactionId());
>                                    }
>                                    pendingAck = null;
>                                     session.sendAck(ack);
>                                    // Adjust the counters
>                                    deliveredCounter = Math.max(0, 
> deliveredCounter - deliveredMessages.size());
>                                    additionalWindowSize = Math.max(0, 
> additionalWindowSize - deliveredMessages.size());
>                                    if (!session.getTransacted()) {
>                                        deliveredMessages.clear();
>                                    }
>                                }
>                     }
> {code}                               
>                 
>                 With these changes on the configuration and the code, the 
> problem is easily reproduced.
>                 We also try with transactedIndividualAck=true, and we add a 
> Thread.sleep in the code :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#registerSync()|borderStyle=solid}
>                 
>                     private void registerSync() throws JMSException {
>                                session.doStartTransaction();
>                                if (!synchronizationRegistered) {
>                                    synchronizationRegistered = true;
>                                    
> session.getTransactionContext().addSynchronization(new Synchronization() {
>                                        @Override
>                                        public void beforeEnd() throws 
> Exception {
>                                            if (transactedIndividualAck) {
>                                                clearDeliveredList();
>                                                waitForRedeliveries();
>                                                
> synchronized(deliveredMessages) {
>                                                    
>                                                    // BEGIN MODIFIED CODE
>                                                    LOG.warn("start sleeping 
> 20 seconds to test failover");
>                                                    try{
>                                                        Thread.sleep(1000 * 20 
> );
>                                                    }catch 
> (InterruptedException e){
>                                                        LOG.error("Exception 
> :",e);
>                                                    }
>                                                    LOG.warn("end sleeping 20 
> seconds to test failover");
>                                                    // END MODIFIED CODE       
>                      
>                                                    
> rollbackOnFailedRecoveryRedelivery();
>                                                }
>                                            } else {
>                                                acknowledge();
>                                            }
>                                            synchronizationRegistered = false;
>                                        }
>                                        @Override
>                                        public void afterCommit() throws 
> Exception {
>                                            commit();
>                                            synchronizationRegistered = false;
>                                        }
>                                        @Override
>                                        public void afterRollback() throws 
> Exception {
>                                            rollback();
>                                            synchronizationRegistered = false;
>                                        }
>                                    });
>                                }
>                     }
> {code}                                               
>                 With these modifications, we still get duplicates messages.
>                 We think that the problem is that the statement 
> synchronized(deliveredMessages) prevents the call of clearDeliveredList() by 
> another ActiveMQConnection thread that clears messages in progress.
>                 By adding logs we observe that a thread is waiting 
> deliveredMessages ‘s lock in clearDeliveredList() method.
>                 
> Question :
>                 
>                 We tried fixes described in 
> https://issues.apache.org/jira/browse/AMQ-5068 and 
> https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve 
> our problem.
>                 Is there a workaround or a config parameter that can help to 
> prevent this problem ?
>                 
>                 We are working on our side to find a correction. An option 
> may be to force rolling back transaction if there is a failover during the 
> prepare phase of commit in ConnectionStateTracker.restoreTransactions().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to