[
https://issues.apache.org/jira/browse/AMQ-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Radek Kraus updated AMQ-6891:
-----------------------------
Description:
I have following scenario (see attached test case):
# Send 1 message in JMS transaction
# Enable database problem simulation (throw {{SQLException}} in
{{TransactionContext.executeBatch()}} method - the similar situation should
happen, when commit fails)
# Attempt to send 2 messages in one JMS transaction, send operation fails as
is expected (only 1 message is in database from first send operation)
# Disable database problem simulation ({{SQLException}} is not thrown from now)
# Repeat the attempt to send "the same" 2 messages in one JMS transaction,
send operation is successful now, how is expected (3 messages are in database)
# Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 3,
2, 3.
I have suspicion, that problem is in
{{org.apache.activemq.broker.region.Queue}}. It seems that reason is
{{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} method
is invoked for each message by {{JDBCMessageStore.addMessage(ConnectionContext,
Message) method}}, which adds {{MessageContext}} into this list. The added
{{MessageContext}} is processed (and removed) in
{{Queue.doPendingCursorAdditions()}} method, which is invoked only from
"afterCommit synchronization" ({{Queue.CursorAddSync.afterCommit()}} method).
But when commit operation fails, then "afterCommit" method is not invoked (but
{{afterRollback}} method is invoked) and {{MessageContext}} entries stays in
{{indexOrderedCursorUpdates}} list.
Personaly I would expect, that some "remove" operation should be done in
{{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation
should be done in {{Queue.doMessageSend()}} method on place, where
{{Exception}} from "addMessage" is handled in case when JMS transaction is not
used. Or some different "completion" operation should be introduced, because
{{MessageContext}} is only add into the list, but don't removed in case of
failure.
When I tried to register (and use) {{LeaseLockerIOExceptionHandler}}
IOExceptionHandler, the transports was successfully restarted, but my "client"
code was blocked in {{ActiveMQSession.commit()}} method. Is it expected
behavior?
When I tried to add following code into
{{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages
(when JMS transaction is used), but it was only blind shot, sorry, because I
don't understand the whole logic here.
{code:java}
@Override
public void afterRollback() throws Exception {
synchronized(indexOrderedCursorUpdates) {
for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
MessageContext mc = indexOrderedCursorUpdates.get(i);
if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
indexOrderedCursorUpdates.remove(mc);
if(mc.onCompletion != null) {
mc.onCompletion.run();
}
break;
}
}
}
messageContext.message.decrementReferenceCount();
}
{code}
was:
I have following scenario (see attached test case):
# Send 1 message in JMS transaction
# Enable database problem simulation (throw {{SQLException}} in
{{TransactionContext.executeBatch()}} method - the similar situation should
happen, when commit fails)
# Attempt to send 2 messages in one JMS transaction, send operation fails as
is expected (only 1 message is in database from first send operation)
# Disable database problem simulation ({{SQLException}} is not thrown from now)
# Repeat the attempt to send "the same" 2 messages in one JMS transaction,
send operation is successful now, how is expected (3 messages are in database)
# Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 3,
2, 3.
I have suspicion, that problem is in
{{org.apache.activemq.broker.region.Queue}}. It seems that reason is
{{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} method
is invoked for each message by {{JDBCMessageStore.addMessage(ConnectionContext,
Message) method}}, which adds {{MessageContext}} into this list. The added
{{MessageContext}} is processed (and removed) in
{{Queue.doPendingCursorAdditions()}} method, which is invoked only from
"afterCommit synchronization" ({{Queue.CursorAddSync.afterCommit()}} method).
But when commit operation fails, then "afterCommit" method is not invoked (but
{{afterRollback}} method is invoked) and {{MessageContext}} entries stays in
{{indexOrderedCursorUpdates}} list.
Personaly I would expect, that some "remove" operation should be done in
{{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation
should be done in {{Queue.doMessageSend()}} method on place, where
{{Exception}} from "addMessage" is handled in case when JMS transaction is not
used. Or some different "completion" operation should be introduced, because
{{MessageContext}} is only add into the list, but don't removed in case of
failure.
When I tried to register (and use) {{LeaseLockerIOExceptionHandler
}}IOExceptionHandler, the transports was successfully restarted, but my
"client" code was blocked in {{ActiveMQSession.commit()}} method. Is it
expected behavior?
When I tried to add following code into
{{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages
(when JMS transaction is used), but it was only blind shot, sorry, because I
don't understand the whole logic here.
{code:java}
@Override
public void afterRollback() throws Exception {
synchronized(indexOrderedCursorUpdates) {
for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
MessageContext mc = indexOrderedCursorUpdates.get(i);
if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
indexOrderedCursorUpdates.remove(mc);
if(mc.onCompletion != null) {
mc.onCompletion.run();
}
break;
}
}
}
messageContext.message.decrementReferenceCount();
}
{code}
> Duplicated message in JMS transaction, when jdbc persistence fails
> ------------------------------------------------------------------
>
> Key: AMQ-6891
> URL: https://issues.apache.org/jira/browse/AMQ-6891
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.15.2
> Reporter: Radek Kraus
> Priority: Major
> Attachments: JmsTransactionCommitFailureTest.java
>
>
> I have following scenario (see attached test case):
> # Send 1 message in JMS transaction
> # Enable database problem simulation (throw {{SQLException}} in
> {{TransactionContext.executeBatch()}} method - the similar situation should
> happen, when commit fails)
> # Attempt to send 2 messages in one JMS transaction, send operation fails as
> is expected (only 1 message is in database from first send operation)
> # Disable database problem simulation ({{SQLException}} is not thrown from
> now)
> # Repeat the attempt to send "the same" 2 messages in one JMS transaction,
> send operation is successful now, how is expected (3 messages are in database)
> # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2,
> 3, 2, 3.
> I have suspicion, that problem is in
> {{org.apache.activemq.broker.region.Queue}}. It seems that reason is
> {{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}}
> method is invoked for each message by
> {{JDBCMessageStore.addMessage(ConnectionContext, Message) method}}, which
> adds {{MessageContext}} into this list. The added {{MessageContext}} is
> processed (and removed) in {{Queue.doPendingCursorAdditions()}} method, which
> is invoked only from "afterCommit synchronization"
> ({{Queue.CursorAddSync.afterCommit()}} method). But when commit operation
> fails, then "afterCommit" method is not invoked (but {{afterRollback}} method
> is invoked) and {{MessageContext}} entries stays in
> {{indexOrderedCursorUpdates}} list.
> Personaly I would expect, that some "remove" operation should be done in
> {{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation
> should be done in {{Queue.doMessageSend()}} method on place, where
> {{Exception}} from "addMessage" is handled in case when JMS transaction is
> not used. Or some different "completion" operation should be introduced,
> because {{MessageContext}} is only add into the list, but don't removed in
> case of failure.
> When I tried to register (and use) {{LeaseLockerIOExceptionHandler}}
> IOExceptionHandler, the transports was successfully restarted, but my
> "client" code was blocked in {{ActiveMQSession.commit()}} method. Is it
> expected behavior?
> When I tried to add following code into
> {{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages
> (when JMS transaction is used), but it was only blind shot, sorry, because I
> don't understand the whole logic here.
> {code:java}
> @Override
> public void afterRollback() throws Exception {
> synchronized(indexOrderedCursorUpdates) {
> for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
> MessageContext mc = indexOrderedCursorUpdates.get(i);
>
> if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
> indexOrderedCursorUpdates.remove(mc);
> if(mc.onCompletion != null) {
> mc.onCompletion.run();
> }
> break;
> }
> }
> }
> messageContext.message.decrementReferenceCount();
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)