[
https://issues.apache.org/jira/browse/AMQ-7314?focusedWorklogId=396232&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-396232
]
ASF GitHub Bot logged work on AMQ-7314:
---------------------------------------
Author: ASF GitHub Bot
Created on: 02/Mar/20 16:56
Start Date: 02/Mar/20 16:56
Worklog Time Spent: 10m
Work Description: jbonofre commented on pull request #489: [AMQ-7314] Fix
counter increment when lastDelivereSeqId > BrokerSequenceId
URL: https://github.com/apache/activemq/pull/489
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 396232)
Time Spent: 20m (was: 10m)
> Redelivery counter is not getting incremented when LastDeliveredSeqID >
> BrokerSequenceId
> ----------------------------------------------------------------------------------------
>
> Key: AMQ-7314
> URL: https://issues.apache.org/jira/browse/AMQ-7314
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.14.4, 5.15.10
> Reporter: Dhananjay
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Fix For: 5.16.0, 5.15.12
>
> Attachments: ActiveMQ_RedeliveryCounter_Fix_Diff.htm
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Issue: Redelivery counter is not getting incremented when LastDeliveredSeqID
> > BrokerSequenceId
> Version tested : ActiveMQ 5.14.4 and 5.15.10 versions
> Description:
> I have created the two queues, one for sending requests and other for
> receiving its response.
> The steps for these queues are given below.
>
> Steps:
> 1. SenderA (Machine A) sends Msg on RequestQueue TO ReceiverB (Machine B)
> (Redelivery count is 0, Ack Mode:INDIVIDUAL_ACKNOWLEDGE).
> 2. SenderB sends ResponseMsg on Response queue (Machine B) TO ReceiverA
> (Machine A) (Async Msg Listener, Ack Mode:CLIENT_ACKNOWLEDGE)
> 3. SenderB is waiting for the consumed advisory for ResponseMsg. Request Msg
> is not yet acknowledged.
>
> 4. ReceiverA acknowledges the ResponseMsg. ResponseMsg consumed advisory is
> received by SenderB
> 5. Machine B disconnects from the embedded broker hosted on Machine A
>
> 6. On Machine B reconnection, Broker sends Msg on RequestQueue again ->
> ReceiverB (Issue: Redelivery count is 0 instead of 1)
> Analysis:
> 1. The Queue.java has removeSubscription() function which increments the
> redelivery counter by comparing lastDeliveredSequenceId and BrokerSequenceId.
> 2. The step4 incremented the LastDeliveredSeqID for message consumed
> advisory.
> 3. The transport connection updates the lastDeliveredSequenceId for each
> consumer by calling
> processRemoveConsumer(consumerId, lastDeliveredSequenceId);
> 4. The condition “ref.getMessageId().getBrokerSequenceId() ==
> lastDeliveredSequenceId” in removeSubscription() doesn’t set
> markAsRedelivered to true.
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: queue://RequestQueue_334 remove sub: QueueSubscription:
> consumer=ID:5107-1569853470047-1:1:1:1, destinations=1, dispatched=2,
> delivered=2, pending=0, prefetch=1, prefetchExtension=2, lastDeliveredSeqId:
> 17, dequeues: 0, dispatched: 2, inflight: 2, groups: 0
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference:
> ID:WIN2K8R2-7986-1569853472713-4:1:1:1:1, BrokerSequenceId 7
> org.apache.activemq.broker.region.Queue removeSubscription
> FINE: LastDeliveredSeqID: 17, message reference:
> ID:WIN2K8R2-7986-1569853472713-4:1:1:1:2, BrokerSequenceId 8
> Queue.java , removeSubscription()
> Existing Code in 5.14.4:
> // locate last redelivered in unconsumed list (list in delivery rather than
> seq order)
> if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
> for (MessageReference ref : unAckedMessages) {
> if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
> lastDeliveredRef = ref;
> markAsRedelivered = true;
> LOG.debug("found lastDeliveredSeqID: {}, message reference: {}",
> lastDeliveredSequenceId, ref.getMessageId());
> break;
> }
> }
> }
> Suggested fix:
> // locate last redelivered in unconsumed list (list in delivery rather than
> seq order)
> if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
> for (MessageReference ref : unAckedMessages) {
> LOG.debug("UnAcked message reference: {}, BrokerSequenceId {}",
> ref.getMessageId(), ref.getMessageId().getBrokerSequenceId());
> if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 &&
> ref.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId))
> {
> lastDeliveredRef = ref;
> markAsRedelivered = true;
> LOG.debug("found lastDeliveredSeqID: {}, message reference: {}",
> lastDeliveredSequenceId, ref.getMessageId());
> }
> }
> }
> Could you please review the analysis and fix?
> Thanks,
> Dhananjay
--
This message was sent by Atlassian Jira
(v8.3.4#803005)