Radek Kraus created AMQ-6891:
--------------------------------

             Summary: Duplicated message in JMS transaction, when jdbc 
persistence fails
                 Key: AMQ-6891
                 URL: https://issues.apache.org/jira/browse/AMQ-6891
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.15.2
            Reporter: Radek Kraus


I have following scenario (see attached test case):
 # Send 1 message in JMS transaction
 # Enable database problem simulation (throw {{SQLException}} in 
{{TransactionContext.executeBatch()}} method - the similar situation should 
happen, when commit fails)
 # Attempt to send 2 messages in one JMS transaction, send operation fails as 
is expected (only 1 message is in database from first send operation)
 # Disable database problem simulation ({{SQLException}} is not thrown from now)
 # Repeat the attempt to send "the same" 2 messages in one JMS transaction, 
send operation is successful now, how is expected (3 messages are in database)
 # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 3, 
2, 3.

I have suspicion, that problem is in 
{{org.apache.activemq.broker.region.Queue}}. It seems that reason is 
{{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} method 
is invoked for each message by {{JDBCMessageStore.addMessage(ConnectionContext, 
Message) method}}, which adds {{MessageContext}} into this list. The added 
{{MessageContext}} is processed (and removed) in 
{{Queue.doPendingCursorAdditions()}} method, which is invoked only from 
"afterCommit synchronization" ({{Queue.CursorAddSync.afterCommit()}} method). 
But when commit operation fails, then "afterCommit" method is not invoked (but 
{{afterRollback}} method is invoked) and {{MessageContext}} entries stays in 
{{indexOrderedCursorUpdates}} list.

Personaly I would expect, that some "remove" operation should be done in 
{{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation 
should be done in {{Queue.doMessageSend()}} method on place, where 
{{Exception}} from "addMessage" is handled in case when JMS transaction is not 
used. Or some different "completion" operation should be introduced, because 
{{MessageContext}} is only add into the list,  but don't removed in case of 
failure.

When I tried to register (and use) {{LeaseLockerIOExceptionHandler 
}}IOExceptionHandler, the transports was successfully restarted, but my 
"client" code was blocked in {{ActiveMQSession.commit()}} method. Is it 
expected behavior?

When I tried to add following code into 
{{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages 
(when JMS transaction is used), but it was only blind shot, sorry, because I 
don't understand the whole logic here.
{code:java}
@Override
public void afterRollback() throws Exception {
  synchronized(indexOrderedCursorUpdates) {
    for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
      MessageContext mc = indexOrderedCursorUpdates.get(i);
        
if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
        indexOrderedCursorUpdates.remove(mc);
        if(mc.onCompletion != null) {
          mc.onCompletion.run();
        }
        break;
      }
    }
  }
  messageContext.message.decrementReferenceCount();
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to