lhotari commented on code in PR #20032:
URL: https://github.com/apache/pulsar/pull/20032#discussion_r1798874588


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoc
     }
 
     private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
+        // broker side epoch is smaller than consumer epoch, so don't need to 
handle this redeliver request
+        if (consumerEpoch < consumer.getConsumerEpoch()) {
+            log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since 
broker epoch [{}] is smaller than "
+                            + "consumer epoch [{}]",
+                    name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
+            return;
+        }
 
-        if (consumerEpoch > consumer.getConsumerEpoch()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch 
[{}]",
-                        name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
-            }
-            consumer.setConsumerEpoch(consumerEpoch);
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
+                    name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
         }
 
+        consumer.setConsumerEpoch(consumerEpoch);

Review Comment:
   This doesn't make sense to me. Why is the broker side epoch updated here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to