[ https://issues.apache.org/jira/browse/AMQ-7314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jean-Baptiste Onofré updated AMQ-7314: -------------------------------------- Fix Version/s: (was: 5.19.0) (was: 5.17.5) (was: 5.18.3) > Redelivery counter is not getting incremented when LastDeliveredSeqID > > BrokerSequenceId > ---------------------------------------------------------------------------------------- > > Key: AMQ-7314 > URL: https://issues.apache.org/jira/browse/AMQ-7314 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.14.4, 5.15.10 > Reporter: Dhananjay > Assignee: Jean-Baptiste Onofré > Priority: Major > Attachments: ActiveMQ_RedeliveryCounter_Fix_Diff.htm > > Time Spent: 20m > Remaining Estimate: 0h > > Issue: Redelivery counter is not getting incremented when LastDeliveredSeqID > > BrokerSequenceId > Version tested : ActiveMQ 5.14.4 and 5.15.10 versions > Description: > I have created the two queues, one for sending requests and other for > receiving its response. > The steps for these queues are given below. > > Steps: > 1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B) > (Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE). > 2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA > (Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE) > 3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg > is not yet acknowledged. > > 4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is > received by SenderB > 5. Machine B disconnects from the embedded broker hosted on Machine A > > 6. On Machine B reconnection, Broker sends Msg on RequestQueue again -> > ReceiverB (Issue: Redelivery count is 0 instead of 1) > Analysis: > 1. The Queue.java has removeSubscription() function which increments the > redelivery counter by comparing lastDeliveredSequenceId and BrokerSequenceId. > 2. The step4 incremented the LastDeliveredSeqID for message consumed > advisory. > 3. The transport connection updates the lastDeliveredSequenceId for each > consumer by calling > processRemoveConsumer(consumerId, lastDeliveredSequenceId); > 4. The condition “ref.getMessageId().getBrokerSequenceId() == > lastDeliveredSequenceId” in removeSubscription() doesn’t set > markAsRedelivered to true. > org.apache.activemq.broker.region.Queue removeSubscription > FINE: queue://RequestQueue_334 remove sub: QueueSubscription: > consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2, > delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId: > 17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0 > org.apache.activemq.broker.region.Queue removeSubscription > FINE: LastDeliveredSeqID: 17, message reference: > ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7 > org.apache.activemq.broker.region.Queue removeSubscription > FINE: LastDeliveredSeqID: 17, message reference: > ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8 > Queue.java , removeSubscription() > Existing Code in 5.14.4: > // locate last redelivered in unconsumed list (list in delivery rather than > seq order) > if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) { > for (MessageReference ref : unAckedMessages) { > if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) { > lastDeliveredRef = ref; > markAsRedelivered = true; > LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", > lastDeliveredSequenceId, ref.getMessageId()); > break; > } > } > } > Suggested fix: > // locate last redelivered in unconsumed list (list in delivery rather than > seq order) > if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) { > for (MessageReference ref : unAckedMessages) { > LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}", > ref.getMessageId(), ref.getMessageId().getBrokerSequenceId()); > if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && > ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) > { > lastDeliveredRef = ref; > markAsRedelivered = true; > LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", > lastDeliveredSequenceId, ref.getMessageId()); > } > } > } > Could you please review the analysis and fix? > Thanks, > Dhananjay -- This message was sent by Atlassian Jira (v8.20.10#820010)