This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e9f49f25f5f1df2c5d91e8305eaf942dd7bd8552 Author: Lari Hotari <[email protected]> AuthorDate: Mon Nov 24 14:35:55 2025 +0200 [fix][client] PIP-84: Skip processing a message in the message listener if the consumer epoch is no longer valid (#25007) (cherry picked from commit 9deee37f0b42c1899965c5f19440c62cb35eb9df) --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index ffe9b319338..af97634d08f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1189,6 +1189,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T // after enabled message listener. receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl ? ((TopicMessageImpl<T>) msg).getMessage() : msg)); + + MessageImpl<T> innerMessage = (MessageImpl<T>) (msg instanceof TopicMessageImpl + ? ((TopicMessageImpl<T>) msg).getMessage() : msg); + if (!receivedConsumer.isValidConsumerEpoch(innerMessage)) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Skipping processing message since the consumer epoch is not valid. {}", topic, + subscription, msg.getMessageId()); + } + return; + } + MessageId id; if (this instanceof ConsumerImpl) { id = MessageIdAdvUtils.discardBatch(msg.getMessageId());
