[ https://issues.apache.org/jira/browse/AMQ-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340992#comment-16340992 ]
Gary Tully commented on AMQ-6891: --------------------------------- [~rkraus] thanks for the great test and analysis. I think you have identified the problem being the lack of work on the sync rollback. Am just wondering if the hash/equals of messageContext should be implemented such that indexOrderedCursorUpdates.remove() would be sufficient and am wondering about calling the completion in the rollback case, but it may be necessary for cleanup. I will investigate some more. thanks again for the great bug report :-) > Duplicated message in JMS transaction, when jdbc persistence fails (Memory > leak on Queue) > ----------------------------------------------------------------------------------------- > > Key: AMQ-6891 > URL: https://issues.apache.org/jira/browse/AMQ-6891 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.15.2 > Reporter: Radek Kraus > Assignee: Gary Tully > Priority: Major > Attachments: JmsTransactionCommitFailureTest.java > > > I have following scenario (see attached test case): > # Send 1 message in JMS transaction > # Enable database problem simulation (throw {{SQLException}} in > {{TransactionContext.executeBatch()}} method - the similar situation should > happen, when commit fails) > # Attempt to send 2 messages in one JMS transaction, send operation fails as > is expected (only 1 message is in database from first send operation) > # Disable database problem simulation ({{SQLException}} is not thrown from > now) > # Repeat the attempt to send "the same" 2 messages in one JMS transaction, > send operation is successful now, how is expected (3 messages are in database) > # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, > 3, 2, 3. > I have suspicion, that problem is in > {{org.apache.activemq.broker.region.Queue}}. It seems that reason is > {{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} > method is invoked for each message by > {{JDBCMessageStore.addMessage(ConnectionContext, Message) method}}, which > adds {{MessageContext}} into this list. The added {{MessageContext}} is > processed (and removed) in {{Queue.doPendingCursorAdditions()}} method, which > is invoked only from "afterCommit synchronization" > ({{Queue.CursorAddSync.afterCommit()}} method). But when commit operation > fails, then "afterCommit" method is not invoked (but {{afterRollback}} method > is invoked) and {{MessageContext}} entries stays in > {{indexOrderedCursorUpdates}} list. > Personaly I would expect, that some "remove" operation should be done in > {{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation > should be done in {{Queue.doMessageSend()}} method on place, where > {{Exception}} from "addMessage" is handled in case when JMS transaction is > not used. Or some different "completion" operation should be introduced, > because {{MessageContext}} is only add into the list, but don't removed in > case of failure. > When I tried to register (and use) {{LeaseLockerIOExceptionHandler}} > IOExceptionHandler, the transports was successfully restarted, but my > "client" code was blocked in {{ActiveMQSession.commit()}} method. Is it > expected behavior? > When I tried to add following code into > {{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages > (when JMS transaction is used), but it was only blind shot, sorry, because I > don't understand the whole logic here. > {code:java} > @Override > public void afterRollback() throws Exception { > synchronized(indexOrderedCursorUpdates) { > for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) { > MessageContext mc = indexOrderedCursorUpdates.get(i); > > if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) { > indexOrderedCursorUpdates.remove(mc); > if(mc.onCompletion != null) { > mc.onCompletion.run(); > } > break; > } > } > } > messageContext.message.decrementReferenceCount(); > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)