rdhabalia commented on a change in pull request #5276: Fixed race condition while triggering message redelivery after an ack-timeout event URL: https://github.com/apache/pulsar/pull/5276#discussion_r329693205
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ########## @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() { if (log.isDebugEnabled()) { log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId); } - // redeliver unacked-msgs - subscription.redeliverUnacknowledgedMessages(this); - flowConsumerBlockedPermits(this); + if (pendingAcks != null) { - AtomicInteger totalRedeliveryMessages = new AtomicInteger(0); - pendingAcks.forEach( - (ledgerId, entryId, batchSize, none) -> totalRedeliveryMessages.addAndGet((int) batchSize)); - msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), totalRedeliveryMessages.get()); - pendingAcks.clear(); + List<PositionImpl> pendingPositions = new ArrayList<>((int) pendingAcks.size()); + MutableInt totalRedeliveryMessages = new MutableInt(0); + pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> { + totalRedeliveryMessages.add((int) batchSize); + pendingPositions.add(new PositionImpl(ledgerId, entryId)); + }); + + for (PositionImpl p : pendingPositions) { + pendingAcks.remove(p.getLedgerId(), p.getEntryId()); Review comment: `pendingPositions` will have same number of positions as `pendingAcks`.. so, why can't we just clear `pendingAcks` here and anyway, `subscription:: redeliverUnacknowledgedMessages` is happening later. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services