rdhabalia commented on a change in pull request #5276: Fixed race condition 
while triggering message redelivery after an ack-timeout event
URL: https://github.com/apache/pulsar/pull/5276#discussion_r329693205
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() {
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] consumer {} received redelivery", topicName, 
subscription, consumerId);
         }
-        // redeliver unacked-msgs
-        subscription.redeliverUnacknowledgedMessages(this);
-        flowConsumerBlockedPermits(this);
+
         if (pendingAcks != null) {
-            AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
-            pendingAcks.forEach(
-                    (ledgerId, entryId, batchSize, none) -> 
totalRedeliveryMessages.addAndGet((int) batchSize));
-            msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), 
totalRedeliveryMessages.get());
-            pendingAcks.clear();
+            List<PositionImpl> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
+            MutableInt totalRedeliveryMessages = new MutableInt(0);
+            pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
+                totalRedeliveryMessages.add((int) batchSize);
+                pendingPositions.add(new PositionImpl(ledgerId, entryId));
+            });
+
+            for (PositionImpl p : pendingPositions) {
+                pendingAcks.remove(p.getLedgerId(), p.getEntryId());
 
 Review comment:
   `pendingPositions` will have same number of positions as `pendingAcks`.. so, 
why can't we just clear `pendingAcks` here and anyway, `subscription:: 
redeliverUnacknowledgedMessages` is happening later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to