Alex Rudyy created QPID-8551:
--------------------------------

             Summary: [JMS AMQP 0-x] The reject per formatives are sent twice 
on rollback of asynchronous consumer transactions
                 Key: QPID-8551
                 URL: https://issues.apache.org/jira/browse/QPID-8551
             Project: Qpid
          Issue Type: Bug
          Components: JMS AMQP 0-x
    Affects Versions: qpid-java-client-0-x-6.4.0, qpid-java-client-0-x-6.3.4, 
qpid-java-client-0-x-6.3.3, qpid-java-client-0-x-6.3.2, 
qpid-java-client-0-x-6.3.1, qpid-java-client-0-x-6.3.0
            Reporter: Alex Rudyy


When consumer transaction is rolled back, the dispatch queue is cleaned by 
invoking {{syncDispatchQueue(false)}}. That results in invocation of 
{{dispatchMessage(UnprocessedMessage message)}}. The latter sends reject 
commands for all tags below rollback threshold:
{code}
if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, 
_rollbackMark.get()))
                {
                    if (_logger.isDebugEnabled())
                    {
                        _logger.debug("Rejecting message because delivery tag " 
+ deliveryTag
                                + " <= rollback mark " + _rollbackMark.get());
                    }
                    rejectMessage(message, true);
}
{code}

Though, the code above does not remove rejected message from 
{{_deliveredMessageTags}}. 

As result, the rejects are sent again on invocation of {{releaseForRollback()}} 
which is called from {{AMQSession#rollback()}}.

The issue affects only consumer transactions when {{MessageListener}} is used 
to receive the message.

The work around for the defect would be to use synchronous consumer for message 
receiving.

It looks like the issue can be fixed by removal of the rejected message from 
{{_deliveredMessageTags}} in {{dispatchMessage(UnprocessedMessage message)}}.
{code}:
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java 
b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 125cba1dd..7f7a5a283 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3621,6 +3621,7 @@ public abstract class AMQSession<C extends 
BasicMessageConsumer, P extends Basic
                                 + " <= rollback mark " + _rollbackMark.get());
                     }
                     rejectMessage(message, true);
+                    getDeliveredMessageTags().remove(deliveryTag);
                 }
                 else if (_usingDispatcherForCleanup)
                 {
{code}

On Qpid Broker-J side the duplicate rejects cause WARN logs like below
{noformat}
 (o.a.q.s.p.v.AMQChannel) - Dropping reject request as message is null for 
tag:1102
{noformat}







--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to