[ 
https://issues.apache.org/jira/browse/AMQ-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341252#comment-16341252
 ] 

Radek Kraus commented on AMQ-6891:
----------------------------------

Thank you for your time and confirmation my issue. Thanks for your great 
reaction time. 
But unfortunately, I think, that problem is more complex. IMHO the invocation 
of \{{rollbackPendingCursorAdditions(MessageContext)}} method only from 
\{{afterRollback()}} solves problem when JMS transaction is used - it is great. 
But the similar problem (entry is not removed from list) occurs even when JMS 
transaction is not used (I noted it in issue description too). The problem is, 
that in this situation the "rollback" method is not invoked at all. In this 
case exception handling in \{{doMessageSend(ProducerBrokerExchange, Message)}} 
doesn't contains remove action too. In normal flow (no exception), the method 
\{{tryOrderedCursorAdd()}} is invoked and remove operation is finally done by 
"standard" \{{doPendingCursorAdditions()}} method. 

{code}
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message 
message) throws IOException,
  ...
  try {
    //AMQ-6133 - don't store async if using persistJMSRedelivered
    //This flag causes a sync update later on dispatch which can cause a race
    //condition if the original add is processed after the update, which can 
cause
    //a duplicate message to be stored
    if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
      result = store.asyncAddQueueMessage(context, message, 
isOptimizeStorage());
      result.addListener(new PendingMarshalUsageTracker(message));
    } else {
      store.addMessage(context, message);
    }
  } catch (Exception e) {
    // we may have a store in inconsistent state, so reset the cursor
    // before restarting normal broker operations
    resetNeeded = true;
    pendingSends.decrementAndGet();
    // !!! 
    // Here should be remove operation too, probably?
    //rollbackPendingCursorAdditions(messageContext);
    // !!!
    throw e;
  }

  ...

  if(tryOrderedCursorAdd(message, context)) {
    break;
  }
{code}

Yes it is my fault, because I didn't provide test case for this situation (send 
message without JMS transaction), sorry. I can extend my test class for this 
test case too. Sorry again.

> Duplicated message in JMS transaction, when jdbc persistence fails (Memory 
> leak on Queue)
> -----------------------------------------------------------------------------------------
>
>                 Key: AMQ-6891
>                 URL: https://issues.apache.org/jira/browse/AMQ-6891
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.15.2
>            Reporter: Radek Kraus
>            Assignee: Gary Tully
>            Priority: Major
>             Fix For: 5.16.0
>
>         Attachments: JmsTransactionCommitFailureTest.java
>
>
> I have following scenario (see attached test case):
>  # Send 1 message in JMS transaction
>  # Enable database problem simulation (throw {{SQLException}} in 
> {{TransactionContext.executeBatch()}} method - the similar situation should 
> happen, when commit fails)
>  # Attempt to send 2 messages in one JMS transaction, send operation fails as 
> is expected (only 1 message is in database from first send operation)
>  # Disable database problem simulation ({{SQLException}} is not thrown from 
> now)
>  # Repeat the attempt to send "the same" 2 messages in one JMS transaction, 
> send operation is successful now, how is expected (3 messages are in database)
>  # Attempt to receive 3 messages 1, 2, 3, but 5 messages are received 1, 2, 
> 3, 2, 3.
> I have suspicion, that problem is in 
> {{org.apache.activemq.broker.region.Queue}}. It seems that reason is 
> {{indexOrderedCursorUpdates}} list. The {{Queue.onAdd(MessageContext)}} 
> method is invoked for each message by 
> {{JDBCMessageStore.addMessage(ConnectionContext, Message) method}}, which 
> adds {{MessageContext}} into this list. The added {{MessageContext}} is 
> processed (and removed) in {{Queue.doPendingCursorAdditions()}} method, which 
> is invoked only from "afterCommit synchronization" 
> ({{Queue.CursorAddSync.afterCommit()}} method). But when commit operation 
> fails, then "afterCommit" method is not invoked (but {{afterRollback}} method 
> is invoked) and {{MessageContext}} entries stays in 
> {{indexOrderedCursorUpdates}} list.
> Personaly I would expect, that some "remove" operation should be done in 
> {{Queue.CursorAddSync.afterRolback()}} method. Probably the similar operation 
> should be done in {{Queue.doMessageSend()}} method on place, where 
> {{Exception}} from "addMessage" is handled in case when JMS transaction is 
> not used. Or some different "completion" operation should be introduced, 
> because {{MessageContext}} is only add into the list,  but don't removed in 
> case of failure.
> When I tried to register (and use) {{LeaseLockerIOExceptionHandler}} 
> IOExceptionHandler, the transports was successfully restarted, but my 
> "client" code was blocked in {{ActiveMQSession.commit()}} method. Is it 
> expected behavior?
> When I tried to add following code into 
> {{Queue.CursorAddSync.afterRollback()}}, I received only 3 expected messages 
> (when JMS transaction is used), but it was only blind shot, sorry, because I 
> don't understand the whole logic here.
> {code:java}
> @Override
> public void afterRollback() throws Exception {
>   synchronized(indexOrderedCursorUpdates) {
>     for(int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
>       MessageContext mc = indexOrderedCursorUpdates.get(i);
>         
> if(mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
>         indexOrderedCursorUpdates.remove(mc);
>         if(mc.onCompletion != null) {
>           mc.onCompletion.run();
>         }
>         break;
>       }
>     }
>   }
>   messageContext.message.decrementReferenceCount();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to