This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 281f0158390d622702ac757a9140772c5216061f Author: Lari Hotari <[email protected]> AuthorDate: Mon Nov 24 14:35:55 2025 +0200 [fix][client] PIP-84: Skip processing a message in the message listener if the consumer epoch is no longer valid (#25007) (cherry picked from commit 9deee37f0b42c1899965c5f19440c62cb35eb9df) --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 8035c1cbdd4..f99860656ca 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1206,6 +1206,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T // after enabled message listener. receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl ? ((TopicMessageImpl<T>) msg).getMessage() : msg)); + + MessageImpl<T> innerMessage = (MessageImpl<T>) (msg instanceof TopicMessageImpl + ? ((TopicMessageImpl<T>) msg).getMessage() : msg); + if (!receivedConsumer.isValidConsumerEpoch(innerMessage)) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Skipping processing message since the consumer epoch is not valid. {}", topic, + subscription, msg.getMessageId()); + } + return; + } + MessageId id; if (this instanceof ConsumerImpl) { id = MessageIdAdvUtils.discardBatch(msg.getMessageId());
