[ 
https://issues.apache.org/jira/browse/AMQ-7314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré reassigned AMQ-7314:
-----------------------------------------

    Assignee: Jean-Baptiste Onofré

> Redelivery counter is not getting incremented when LastDeliveredSeqID > 
> BrokerSequenceId
> ----------------------------------------------------------------------------------------
>
>                 Key: AMQ-7314
>                 URL: https://issues.apache.org/jira/browse/AMQ-7314
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.14.4, 5.15.10
>            Reporter: Dhananjay
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>         Attachments: ActiveMQ_RedeliveryCounter_Fix_Diff.htm
>
>
> Issue: Redelivery counter is not getting incremented when LastDeliveredSeqID 
> > BrokerSequenceId
> Version tested : ActiveMQ 5.14.4 and 5.15.10 versions
> Description:
> I have created the two queues, one for sending requests and other for 
> receiving its response.
> The steps for these queues are given below.
>  
> Steps:
> 1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B) 
> (Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).
> 2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA 
> (Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)
> 3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg 
> is not yet acknowledged.
>  
>  4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is 
> received by SenderB
> 5. Machine B disconnects from the embedded broker hosted on Machine A
>  
>  6. On Machine B reconnection, Broker sends Msg on RequestQueue again -> 
> ReceiverB (Issue: Redelivery count is 0 instead of 1)
> Analysis:
> 1. The Queue.java has removeSubscription() function which increments the 
> redelivery counter by comparing lastDeliveredSequenceId and BrokerSequenceId.
> 2. The step4 incremented the LastDeliveredSeqID for message consumed 
> advisory. 
> 3. The transport connection updates the lastDeliveredSequenceId for each 
> consumer by calling
>  processRemoveConsumer(consumerId, lastDeliveredSequenceId);
>  4. The condition “ref.getMessageId().getBrokerSequenceId() == 
> lastDeliveredSequenceId” in removeSubscription() doesn’t set 
> markAsRedelivered to true.
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: queue://RequestQueue_334 remove sub: QueueSubscription: 
> consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2, 
> delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId: 
> 17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference: 
> ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference: 
> ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8
> Queue.java , removeSubscription()
> Existing Code in 5.14.4:
> // locate last redelivered in unconsumed list (list in delivery rather than 
> seq order)
>  if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
>  for (MessageReference ref : unAckedMessages) {
>  if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
>  lastDeliveredRef = ref;
>  markAsRedelivered = true;
>  LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", 
> lastDeliveredSequenceId, ref.getMessageId());
>  break;
>  }
>  }
>  }
> Suggested fix:
> // locate last redelivered in unconsumed list (list in delivery rather than 
> seq order)
>  if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
>  for (MessageReference ref : unAckedMessages) {
>  LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}", 
> ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
>  if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && 
> ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) 
>  {
>  lastDeliveredRef = ref;
>  markAsRedelivered = true;
>  LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", 
> lastDeliveredSequenceId, ref.getMessageId());
>  }
>  }
>  }
> Could you please review the analysis and fix?
> Thanks, 
> Dhananjay



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to