lhotari commented on code in PR #20032: URL: https://github.com/apache/pulsar/pull/20032#discussion_r1798874588
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java: ########## @@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { + // broker side epoch is smaller than consumer epoch, so don't need to handle this redeliver request + if (consumerEpoch < consumer.getConsumerEpoch()) { + log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since broker epoch [{}] is smaller than " + + "consumer epoch [{}]", + name, consumer, consumer.getConsumerEpoch(), consumerEpoch); + return; + } - if (consumerEpoch > consumer.getConsumerEpoch()) { - if (log.isDebugEnabled()) { - log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]", - name, consumer, consumer.getConsumerEpoch(), consumerEpoch); - } - consumer.setConsumerEpoch(consumerEpoch); + if (log.isDebugEnabled()) { + log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]", + name, consumer, consumer.getConsumerEpoch(), consumerEpoch); } + consumer.setConsumerEpoch(consumerEpoch); Review Comment: This doesn't make sense to me. Why is the broker side epoch updated here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org