Dhananjay created AMQ-7314:
------------------------------

             Summary: Redelivery counter is not getting incremented when 
LastDeliveredSeqID > BrokerSequenceId
                 Key: AMQ-7314
                 URL: https://issues.apache.org/jira/browse/AMQ-7314
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.15.10, 5.14.4
            Reporter: Dhananjay


Issue: Redelivery counter is not getting incremented when LastDeliveredSeqID > 
BrokerSequenceId

Version tested : ActiveMQ 5.14.4 and 5.15.10 versions

Description:
I have created the two queues, one for sending requests and other for receiving 
its response.
The steps for these queues are given below.
 
Steps:

1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B) 
(Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).

2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA 
(Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)

3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg is 
not yet acknowledged.
 
 4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is 
received by SenderB

5. Machine B disconnects from the embedded broker hosted on Machine A
 
 6. On Machine B reconnection, Broker sends Msg on RequestQueue again -> 
ReceiverB (Issue: Redelivery count is 0 instead of 1)

Analysis:
1. The Queue.java has removeSubscription() function which increments the 
redelivery counter by comparing lastDeliveredSequenceId and BrokerSequenceId.
2. The step4 incremented the LastDeliveredSeqID for message consumed advisory. 
3. The transport connection updates the lastDeliveredSequenceId for each 
consumer by calling
 processRemoveConsumer(consumerId, lastDeliveredSequenceId);
 4. The condition “ref.getMessageId().getBrokerSequenceId() == 
lastDeliveredSequenceId” in removeSubscription() doesn’t set markAsRedelivered 
to true.

org.apache.activemq.broker.region.Queue removeSubscription
FINE: queue://RequestQueue_334 remove sub: QueueSubscription: 
consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2, 
delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId: 
17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
org.apache.activemq.broker.region.Queue removeSubscription
FINE: LastDeliveredSeqID: 17, message reference: 
ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
org.apache.activemq.broker.region.Queue removeSubscription
FINE: LastDeliveredSeqID: 17, message reference: 
ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8


Queue.java , removeSubscription()

Existing Code in 5.14.4:

// locate last redelivered in unconsumed list (list in delivery rather than seq 
order)
 if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
 for (MessageReference ref : unAckedMessages) {
 if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
 lastDeliveredRef = ref;
 markAsRedelivered = true;
 LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", 
lastDeliveredSequenceId, ref.getMessageId());
 break;
 }
 }
 }

Suggested fix:

// locate last redelivered in unconsumed list (list in delivery rather than seq 
order)
 if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
 for (MessageReference ref : unAckedMessages) {
 LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}", 
ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
 if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && 
ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) 
 {
 lastDeliveredRef = ref;
 markAsRedelivered = true;
 LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", 
lastDeliveredSequenceId, ref.getMessageId());
 }
 }
 }

Could you please review the analysis and fix?

Thanks, 
Dhananjay



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to