Michael created AMQ-5854:
----------------------------
Summary: Duplicate messages when failover is done during prepare
phase of two phase commit.
Key: AMQ-5854
URL: https://issues.apache.org/jira/browse/AMQ-5854
Project: ActiveMQ
Issue Type: Bug
Components: Broker, JMS client
Affects Versions: 5.11.1, 5.10.2, 5.9.1
Environment: Ubuntu or AIX
ActiveMQ 5.9.1 (problem is reproduced in 5.10.2 and 5.11.1)
Spring DMLC
XA transactions with atomikos 3.7.0 (problem is also reproduced with 3.9.15)
Persistent messages
Multithreading (this problem occur when there is at least 2 consummers on a
queue)
Reporter: Michael
Use case :
With Spring DMLC, Read a jms message in a queue, produce a jms
message in an output queue and write data in database.
Problem description :
Due to hight CPU usage, the inactity monitor closes connections
between clients and broker while 16 messages were processed.
2015-06-01 04:39:01,130 | WARN | Transport Connection to:
tcp://*** failed: org.apache.activemq.transport.InactivityIOException: Channel
was inactive for too (>30000) long: tcp://*** |
org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ
InactivityMonitor Worker
15 messages are rolled back and redilevered to another
consummer.
In the log we got 15 warnings :
ActiveMQMessageConsumer |WARN |jmsContainer-173|rolling back
transaction (XID:***) post failover recovery. 1 previously delivered message(s)
not replayed to consumer: ***
But one message is not rolled back (the transaction commit) and
is also redileverd to another consummer. So it's processed twice by two
different consummers (two inserts in database and two output JMS messages
generated) and is not deduplicated.
In the activeMq log we got the message :
WARN | Async error occurred: |
org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport:
tcp:///***
javax.jms.JMSException: Unmatched acknowledge:
MessageAck {commandId = 6665, responseRequired = false, ackType = 2, consumerId
= ID:***, firstMessageId = ID:***-50800-1433109620591-1:2:31356:1:1,
lastMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, destination =
queue://***, transactionId =
XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737],
messageCount = 1, poisonCause = null}; Could not find Message-ID
ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)
For this duplicated message, the failover occur during prepare
phase of commit :
[{2015/06/01 04:39:50,322 |FailoverTransport |WARN
|jmsContainer-152|Transport (tcp://***) failed, reason: , attempting to
automatically reconnect}]
org.apache.activemq.transport.InactivityIOException: Cannot send, channel has
already failed: ***
at
org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
at
org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at
org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
at
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
at
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
at
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
at
org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
at
org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
at
org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
at
org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
at
com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
at
com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
at
com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
at
com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
at
com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
at
com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
at
com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
at
com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
at
com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
at
com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
at
com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
at
com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
at
com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
at
org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
at
org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
at
org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
at java.lang.Thread.run(Thread.java:761)
Our analysis :
We think that the duplicate message is caused by the failover
during the prepare phase of the commit so we modify the source code to
reproduce the case.
Our modifications in config to produce failovers:
broker : transport.useKeepAlive=false
client : wireFormat.maxInactivityDuration=5000
We add Thread.sleep in the source code of
org.apache.activemq.ActiveMQMessageConsumer to force failover to be done
exactly where we think it causes problems :
org.apache.activemq.ActiveMQMessageConsumer#acknowledge()
public void acknowledge() throws JMSException {
clearDeliveredList();
waitForRedeliveries();
synchronized(deliveredMessages) {
// BEGIN MODIFIED CODE
LOG.warn("start sleeping 20 seconds to test
failover");
try{
Thread.sleep(1000 * 20 );
}catch (InterruptedException e){
LOG.error("Exception :",e);
}
LOG.warn("end sleeping 20 seconds to test
failover");
// END MODIFIED CODE
// Acknowledge all messages so far.
MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack == null)
return; // no msgs
if (session.getTransacted()) {
rollbackOnFailedRecoveryRedelivery();
session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId());
}
pendingAck = null;
session.sendAck(ack);
// Adjust the counters
deliveredCounter = Math.max(0,
deliveredCounter - deliveredMessages.size());
additionalWindowSize = Math.max(0,
additionalWindowSize - deliveredMessages.size());
if (!session.getTransacted()) {
deliveredMessages.clear();
}
}
}
With these changes on the configuration and the code, the
problem is easily reproduced.
We also try with transactedIndividualAck=true, and we add a
Thread.sleep in the code :
private void registerSync() throws JMSException {
session.doStartTransaction();
if (!synchronizationRegistered) {
synchronizationRegistered = true;
session.getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception
{
if (transactedIndividualAck) {
clearDeliveredList();
waitForRedeliveries();
synchronized(deliveredMessages) {
// BEGIN MODIFIED CODE
LOG.warn("start sleeping 20
seconds to test failover");
try{
Thread.sleep(1000 * 20 );
}catch (InterruptedException
e){
LOG.error("Exception
:",e);
}
LOG.warn("end sleeping 20
seconds to test failover");
// END MODIFIED CODE
rollbackOnFailedRecoveryRedelivery();
}
} else {
acknowledge();
}
synchronizationRegistered = false;
}
@Override
public void afterCommit() throws
Exception {
commit();
synchronizationRegistered = false;
}
@Override
public void afterRollback() throws
Exception {
rollback();
synchronizationRegistered = false;
}
});
}
}
With these modifications, we still get duplicates messages.
We think that the problem is that the statement
synchronized(deliveredMessages) prevents the call of clearDeliveredList() by
another ActiveMQConnection thread that clears messages in progress.
By adding logs we observe that a thread is waiting
deliveredMessages ‘s lock in clearDeliveredList() method.
Question :
We tried fixes described in
https://issues.apache.org/jira/browse/AMQ-5068 and
https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve our
problem.
Is there a workaround or a config parameter that can help to
prevent this problem ?
We are working on our side to find a correction. An option may
be to force rolling back transaction if there is a failover during the prepare
phase of commit in ConnectionStateTracker.restoreTransactions().
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)