[ 
https://issues.apache.org/jira/browse/AMQ-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael updated AMQ-5854:
-------------------------
    Description: 
Use case :
                With Spring DMLC, Read a jms message in a queue, produce a jms 
message in an output queue and write data in database.

Problem description :

                Due to hight CPU usage, the inactity monitor closes connections 
between clients and broker while 16 messages were processed.

                2015-06-01 04:39:01,130 | WARN  | Transport Connection to: 
tcp://*** failed: org.apache.activemq.transport.InactivityIOException: Channel 
was inactive for too (>30000) long: tcp://*** | 
org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ 
InactivityMonitor Worker

                15 messages are rolled back and redilevered to another 
consummer.

                In the log we got 15 warnings :
                ActiveMQMessageConsumer   |WARN |jmsContainer-173|rolling back 
transaction (XID:***) post failover recovery. 1 previously delivered message(s) 
not replayed to consumer: ***

                But one message is not rolled back (the transaction commit) and 
is also redileverd to another consummer. So it's processed twice by two 
different consummers (two inserts in database and two output JMS messages 
generated) and is not deduplicated.

                In the activeMq log we got the message :

                WARN  | Async error occurred:  | 
org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: 
tcp:///***
                       javax.jms.JMSException: Unmatched acknowledge: 
MessageAck {commandId = 6665, responseRequired = false, ackType = 2, consumerId 
= ID:***, firstMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, 
lastMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, destination = 
queue://***, transactionId = 
XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
 messageCount = 1, poisonCause = null}; Could not find Message-ID 
ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)


                For this duplicated message, the failover occur during prepare 
phase of commit :


[{2015/06/01 04:39:50,322 |FailoverTransport         |WARN 
|jmsContainer-152|Transport (tcp://***) failed, reason:  , attempting to 
automatically reconnect}]
org.apache.activemq.transport.InactivityIOException: Cannot send, channel has 
already failed: ***
                at 
org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
                at 
org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
                at 
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
                at 
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
                at 
org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
                at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
                at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
                at 
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
                at 
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
                at 
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
                at 
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
                at 
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
                at 
org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
                at 
org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
                at 
org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
                at 
org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
                at 
com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
                at 
com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
                at 
com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
                at 
com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
                at 
com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
                at 
com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
                at 
com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
                at 
com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
                at 
com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
                at 
com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
                at 
com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
                at 
com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
                at 
com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
                at 
org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
                at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
                at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
                at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
                at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
                at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
                at java.lang.Thread.run(Thread.java:761)


Our analysis :

                We think that the duplicate message is caused by the failover 
during the prepare phase of the commit so we modify the source code to 
reproduce the case.

                Our modifications in config to produce failovers:
                               broker : transport.useKeepAlive=false
                               client : wireFormat.maxInactivityDuration=5000


                We add Thread.sleep in the source code of 
org.apache.activemq.ActiveMQMessageConsumer to force failover to be done 
exactly where we think it causes problems :

{code:title=org.apache.activemq.ActiveMQMessageConsumer#acknowledge()|borderStyle=solid}
                

                    public void acknowledge() throws JMSException {
                               clearDeliveredList();
                               waitForRedeliveries();
                               synchronized(deliveredMessages) {

                                   // BEGIN MODIFIED CODE
                                   LOG.warn("start sleeping 20 seconds to test 
failover");
                                   try{
                                       Thread.sleep(1000 * 20 );
                                   }catch (InterruptedException e){
                                       LOG.error("Exception :",e);
                                   }
                                   LOG.warn("end sleeping 20 seconds to test 
failover");
                                   // END MODIFIED CODE

                                   // Acknowledge all messages so far.
                                   MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                   if (ack == null)
                                       return; // no msgs

                                   if (session.getTransacted()) {
                                       rollbackOnFailedRecoveryRedelivery();
                                       session.doStartTransaction();
                                       
ack.setTransactionId(session.getTransactionContext().getTransactionId());
                                   }

                                   pendingAck = null;
                                    session.sendAck(ack);

                                   // Adjust the counters
                                   deliveredCounter = Math.max(0, 
deliveredCounter - deliveredMessages.size());
                                   additionalWindowSize = Math.max(0, 
additionalWindowSize - deliveredMessages.size());

                                   if (!session.getTransacted()) {
                                       deliveredMessages.clear();
                                   }
                               }
                    }
{code}                               
                
                With these changes on the configuration and the code, the 
problem is easily reproduced.

                We also try with transactedIndividualAck=true, and we add a 
Thread.sleep in the code :

{code:title=org.apache.activemq.ActiveMQMessageConsumer#registerSync()|borderStyle=solid}
                
                    private void registerSync() throws JMSException {
                               session.doStartTransaction();
                               if (!synchronizationRegistered) {
                                   synchronizationRegistered = true;
                                   
session.getTransactionContext().addSynchronization(new Synchronization() {
                                       @Override
                                       public void beforeEnd() throws Exception 
{
                                           if (transactedIndividualAck) {
                                               clearDeliveredList();
                                               waitForRedeliveries();
                                               synchronized(deliveredMessages) {
                                                   
                                                   // BEGIN MODIFIED CODE
                                                   LOG.warn("start sleeping 20 
seconds to test failover");
                                                   try{
                                                       Thread.sleep(1000 * 20 );
                                                   }catch (InterruptedException 
e){
                                                       LOG.error("Exception 
:",e);
                                                   }
                                                   LOG.warn("end sleeping 20 
seconds to test failover");
                                                   // END MODIFIED CODE         
                   

                                                   
rollbackOnFailedRecoveryRedelivery();
                                               }
                                           } else {
                                               acknowledge();
                                           }
                                           synchronizationRegistered = false;
                                       }

                                       @Override
                                       public void afterCommit() throws 
Exception {
                                           commit();
                                           synchronizationRegistered = false;
                                       }

                                       @Override
                                       public void afterRollback() throws 
Exception {
                                           rollback();
                                           synchronizationRegistered = false;
                                       }
                                   });
                               }
                    }
{code}                                               
                With these modifications, we still get duplicates messages.

                We think that the problem is that the statement 
synchronized(deliveredMessages) prevents the call of clearDeliveredList() by 
another ActiveMQConnection thread that clears messages in progress.
                By adding logs we observe that a thread is waiting 
deliveredMessages ‘s lock in clearDeliveredList() method.

                
Question :
                
                We tried fixes described in 
https://issues.apache.org/jira/browse/AMQ-5068 and 
https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve our 
problem.
                Is there a workaround or a config parameter that can help to 
prevent this problem ?
                
                We are working on our side to find a correction. An option may 
be to force rolling back transaction if there is a failover during the prepare 
phase of commit in ConnectionStateTracker.restoreTransactions().




  was:
Use case :
                With Spring DMLC, Read a jms message in a queue, produce a jms 
message in an output queue and write data in database.

Problem description :

                Due to hight CPU usage, the inactity monitor closes connections 
between clients and broker while 16 messages were processed.

                2015-06-01 04:39:01,130 | WARN  | Transport Connection to: 
tcp://*** failed: org.apache.activemq.transport.InactivityIOException: Channel 
was inactive for too (>30000) long: tcp://*** | 
org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ 
InactivityMonitor Worker

                15 messages are rolled back and redilevered to another 
consummer.

                In the log we got 15 warnings :
                ActiveMQMessageConsumer   |WARN |jmsContainer-173|rolling back 
transaction (XID:***) post failover recovery. 1 previously delivered message(s) 
not replayed to consumer: ***

                But one message is not rolled back (the transaction commit) and 
is also redileverd to another consummer. So it's processed twice by two 
different consummers (two inserts in database and two output JMS messages 
generated) and is not deduplicated.

                In the activeMq log we got the message :

                WARN  | Async error occurred:  | 
org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: 
tcp:///***
                       javax.jms.JMSException: Unmatched acknowledge: 
MessageAck {commandId = 6665, responseRequired = false, ackType = 2, consumerId 
= ID:***, firstMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, 
lastMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, destination = 
queue://***, transactionId = 
XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
 messageCount = 1, poisonCause = null}; Could not find Message-ID 
ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)


                For this duplicated message, the failover occur during prepare 
phase of commit :


[{2015/06/01 04:39:50,322 |FailoverTransport         |WARN 
|jmsContainer-152|Transport (tcp://***) failed, reason:  , attempting to 
automatically reconnect}]
org.apache.activemq.transport.InactivityIOException: Cannot send, channel has 
already failed: ***
                at 
org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
                at 
org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
                at 
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
                at 
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
                at 
org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
                at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
                at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
                at 
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
                at 
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
                at 
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
                at 
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
                at 
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
                at 
org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
                at 
org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
                at 
org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
                at 
org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
                at 
com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
                at 
com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
                at 
com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
                at 
com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
                at 
com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
                at 
com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
                at 
com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
                at 
com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
                at 
com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
                at 
com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
                at 
com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
                at 
com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
                at 
com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
                at 
org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
                at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
                at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
                at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
                at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
                at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
                at java.lang.Thread.run(Thread.java:761)


Our analysis :

                We think that the duplicate message is caused by the failover 
during the prepare phase of the commit so we modify the source code to 
reproduce the case.

                Our modifications in config to produce failovers:
                               broker : transport.useKeepAlive=false
                               client : wireFormat.maxInactivityDuration=5000


                We add Thread.sleep in the source code of 
org.apache.activemq.ActiveMQMessageConsumer to force failover to be done 
exactly where we think it causes problems :

                org.apache.activemq.ActiveMQMessageConsumer#acknowledge()

                    public void acknowledge() throws JMSException {
                               clearDeliveredList();
                               waitForRedeliveries();
                               synchronized(deliveredMessages) {

                                   // BEGIN MODIFIED CODE
                                   LOG.warn("start sleeping 20 seconds to test 
failover");
                                   try{
                                       Thread.sleep(1000 * 20 );
                                   }catch (InterruptedException e){
                                       LOG.error("Exception :",e);
                                   }
                                   LOG.warn("end sleeping 20 seconds to test 
failover");
                                   // END MODIFIED CODE

                                   // Acknowledge all messages so far.
                                   MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                   if (ack == null)
                                       return; // no msgs

                                   if (session.getTransacted()) {
                                       rollbackOnFailedRecoveryRedelivery();
                                       session.doStartTransaction();
                                       
ack.setTransactionId(session.getTransactionContext().getTransactionId());
                                   }

                                   pendingAck = null;
                                    session.sendAck(ack);

                                   // Adjust the counters
                                   deliveredCounter = Math.max(0, 
deliveredCounter - deliveredMessages.size());
                                   additionalWindowSize = Math.max(0, 
additionalWindowSize - deliveredMessages.size());

                                   if (!session.getTransacted()) {
                                       deliveredMessages.clear();
                                   }
                               }
                    }
                               
                
                With these changes on the configuration and the code, the 
problem is easily reproduced.

                We also try with transactedIndividualAck=true, and we add a 
Thread.sleep in the code :
                
                    private void registerSync() throws JMSException {
                               session.doStartTransaction();
                               if (!synchronizationRegistered) {
                                   synchronizationRegistered = true;
                                   
session.getTransactionContext().addSynchronization(new Synchronization() {
                                       @Override
                                       public void beforeEnd() throws Exception 
{
                                           if (transactedIndividualAck) {
                                               clearDeliveredList();
                                               waitForRedeliveries();
                                               synchronized(deliveredMessages) {
                                                   
                                                   // BEGIN MODIFIED CODE
                                                   LOG.warn("start sleeping 20 
seconds to test failover");
                                                   try{
                                                       Thread.sleep(1000 * 20 );
                                                   }catch (InterruptedException 
e){
                                                       LOG.error("Exception 
:",e);
                                                   }
                                                   LOG.warn("end sleeping 20 
seconds to test failover");
                                                   // END MODIFIED CODE         
                   

                                                   
rollbackOnFailedRecoveryRedelivery();
                                               }
                                           } else {
                                               acknowledge();
                                           }
                                           synchronizationRegistered = false;
                                       }

                                       @Override
                                       public void afterCommit() throws 
Exception {
                                           commit();
                                           synchronizationRegistered = false;
                                       }

                                       @Override
                                       public void afterRollback() throws 
Exception {
                                           rollback();
                                           synchronizationRegistered = false;
                                       }
                                   });
                               }
                    }
                                               
                With these modifications, we still get duplicates messages.

                We think that the problem is that the statement 
synchronized(deliveredMessages) prevents the call of clearDeliveredList() by 
another ActiveMQConnection thread that clears messages in progress.
                By adding logs we observe that a thread is waiting 
deliveredMessages ‘s lock in clearDeliveredList() method.

                
Question :
                
                We tried fixes described in 
https://issues.apache.org/jira/browse/AMQ-5068 and 
https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve our 
problem.
                Is there a workaround or a config parameter that can help to 
prevent this problem ?
                
                We are working on our side to find a correction. An option may 
be to force rolling back transaction if there is a failover during the prepare 
phase of commit in ConnectionStateTracker.restoreTransactions().





> Duplicate messages when failover is done during prepare phase of two phase 
> commit.
> ----------------------------------------------------------------------------------
>
>                 Key: AMQ-5854
>                 URL: https://issues.apache.org/jira/browse/AMQ-5854
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, JMS client
>    Affects Versions: 5.9.1, 5.10.2, 5.11.1
>         Environment: Ubuntu or AIX
> ActiveMQ 5.9.1 (problem is reproduced in 5.10.2 and 5.11.1)
> Spring DMLC
> XA transactions with atomikos 3.7.0 (problem is also reproduced with 3.9.15)
> Persistent messages
> Multithreading (this problem occur when there is at least 2 consummers on a 
> queue)
>            Reporter: Michael
>
> Use case :
>                 With Spring DMLC, Read a jms message in a queue, produce a 
> jms message in an output queue and write data in database.
> Problem description :
>                 Due to hight CPU usage, the inactity monitor closes 
> connections between clients and broker while 16 messages were processed.
>                 2015-06-01 04:39:01,130 | WARN  | Transport Connection to: 
> tcp://*** failed: org.apache.activemq.transport.InactivityIOException: 
> Channel was inactive for too (>30000) long: tcp://*** | 
> org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ 
> InactivityMonitor Worker
>                 15 messages are rolled back and redilevered to another 
> consummer.
>                 In the log we got 15 warnings :
>                 ActiveMQMessageConsumer   |WARN |jmsContainer-173|rolling 
> back transaction (XID:***) post failover recovery. 1 previously delivered 
> message(s) not replayed to consumer: ***
>                 But one message is not rolled back (the transaction commit) 
> and is also redileverd to another consummer. So it's processed twice by two 
> different consummers (two inserts in database and two output JMS messages 
> generated) and is not deduplicated.
>                 In the activeMq log we got the message :
>                 WARN  | Async error occurred:  | 
> org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: 
> tcp:///***
>                        javax.jms.JMSException: Unmatched acknowledge: 
> MessageAck {commandId = 6665, responseRequired = false, ackType = 2, 
> consumerId = ID:***, firstMessageId = 
> ID:***-50800-1433109620591-1:2:31356:1:1, lastMessageId = 
> ID:***-50800-1433109620591-1:2:31356:1:1, destination = queue://***, 
> transactionId = 
> XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
>  messageCount = 1, poisonCause = null}; Could not find Message-ID 
> ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)
>                 For this duplicated message, the failover occur during 
> prepare phase of commit :
> [{2015/06/01 04:39:50,322 |FailoverTransport         |WARN 
> |jmsContainer-152|Transport (tcp://***) failed, reason:  , attempting to 
> automatically reconnect}]
> org.apache.activemq.transport.InactivityIOException: Cannot send, channel has 
> already failed: ***
>                 at 
> org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
>                 at 
> org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
>                 at 
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
>                 at 
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
>                 at 
> org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
>                 at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>                 at 
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>                 at 
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
>                 at 
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
>                 at 
> org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
>                 at 
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
>                 at 
> org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
>                 at 
> org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
>                 at 
> org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
>                 at 
> org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
>                 at 
> org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
>                 at 
> com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
>                 at 
> com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
>                 at 
> com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
>                 at 
> com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
>                 at 
> com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
>                 at 
> com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
>                 at 
> com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
>                 at 
> com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
>                 at 
> com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
>                 at 
> com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
>                 at 
> com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
>                 at 
> com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
>                 at 
> com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
>                 at 
> org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
>                 at 
> org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
>                 at 
> org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
>                 at 
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
>                 at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
>                 at 
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
>                 at java.lang.Thread.run(Thread.java:761)
> Our analysis :
>                 We think that the duplicate message is caused by the failover 
> during the prepare phase of the commit so we modify the source code to 
> reproduce the case.
>                 Our modifications in config to produce failovers:
>                                broker : transport.useKeepAlive=false
>                                client : wireFormat.maxInactivityDuration=5000
>                 We add Thread.sleep in the source code of 
> org.apache.activemq.ActiveMQMessageConsumer to force failover to be done 
> exactly where we think it causes problems :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#acknowledge()|borderStyle=solid}
>                 
>                     public void acknowledge() throws JMSException {
>                                clearDeliveredList();
>                                waitForRedeliveries();
>                                synchronized(deliveredMessages) {
>                                    // BEGIN MODIFIED CODE
>                                    LOG.warn("start sleeping 20 seconds to 
> test failover");
>                                    try{
>                                        Thread.sleep(1000 * 20 );
>                                    }catch (InterruptedException e){
>                                        LOG.error("Exception :",e);
>                                    }
>                                    LOG.warn("end sleeping 20 seconds to test 
> failover");
>                                    // END MODIFIED CODE
>                                    // Acknowledge all messages so far.
>                                    MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                    if (ack == null)
>                                        return; // no msgs
>                                    if (session.getTransacted()) {
>                                        rollbackOnFailedRecoveryRedelivery();
>                                        session.doStartTransaction();
>                                        
> ack.setTransactionId(session.getTransactionContext().getTransactionId());
>                                    }
>                                    pendingAck = null;
>                                     session.sendAck(ack);
>                                    // Adjust the counters
>                                    deliveredCounter = Math.max(0, 
> deliveredCounter - deliveredMessages.size());
>                                    additionalWindowSize = Math.max(0, 
> additionalWindowSize - deliveredMessages.size());
>                                    if (!session.getTransacted()) {
>                                        deliveredMessages.clear();
>                                    }
>                                }
>                     }
> {code}                               
>                 
>                 With these changes on the configuration and the code, the 
> problem is easily reproduced.
>                 We also try with transactedIndividualAck=true, and we add a 
> Thread.sleep in the code :
> {code:title=org.apache.activemq.ActiveMQMessageConsumer#registerSync()|borderStyle=solid}
>                 
>                     private void registerSync() throws JMSException {
>                                session.doStartTransaction();
>                                if (!synchronizationRegistered) {
>                                    synchronizationRegistered = true;
>                                    
> session.getTransactionContext().addSynchronization(new Synchronization() {
>                                        @Override
>                                        public void beforeEnd() throws 
> Exception {
>                                            if (transactedIndividualAck) {
>                                                clearDeliveredList();
>                                                waitForRedeliveries();
>                                                
> synchronized(deliveredMessages) {
>                                                    
>                                                    // BEGIN MODIFIED CODE
>                                                    LOG.warn("start sleeping 
> 20 seconds to test failover");
>                                                    try{
>                                                        Thread.sleep(1000 * 20 
> );
>                                                    }catch 
> (InterruptedException e){
>                                                        LOG.error("Exception 
> :",e);
>                                                    }
>                                                    LOG.warn("end sleeping 20 
> seconds to test failover");
>                                                    // END MODIFIED CODE       
>                      
>                                                    
> rollbackOnFailedRecoveryRedelivery();
>                                                }
>                                            } else {
>                                                acknowledge();
>                                            }
>                                            synchronizationRegistered = false;
>                                        }
>                                        @Override
>                                        public void afterCommit() throws 
> Exception {
>                                            commit();
>                                            synchronizationRegistered = false;
>                                        }
>                                        @Override
>                                        public void afterRollback() throws 
> Exception {
>                                            rollback();
>                                            synchronizationRegistered = false;
>                                        }
>                                    });
>                                }
>                     }
> {code}                                               
>                 With these modifications, we still get duplicates messages.
>                 We think that the problem is that the statement 
> synchronized(deliveredMessages) prevents the call of clearDeliveredList() by 
> another ActiveMQConnection thread that clears messages in progress.
>                 By adding logs we observe that a thread is waiting 
> deliveredMessages ‘s lock in clearDeliveredList() method.
>                 
> Question :
>                 
>                 We tried fixes described in 
> https://issues.apache.org/jira/browse/AMQ-5068 and 
> https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve 
> our problem.
>                 Is there a workaround or a config parameter that can help to 
> prevent this problem ?
>                 
>                 We are working on our side to find a correction. An option 
> may be to force rolling back transaction if there is a failover during the 
> prepare phase of commit in ConnectionStateTracker.restoreTransactions().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to