This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e9f49f25f5f1df2c5d91e8305eaf942dd7bd8552
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Nov 24 14:35:55 2025 +0200

    [fix][client] PIP-84: Skip processing a message in the message listener if 
the consumer epoch is no longer valid (#25007)
    
    (cherry picked from commit 9deee37f0b42c1899965c5f19440c62cb35eb9df)
---
 .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ffe9b319338..af97634d08f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1189,6 +1189,17 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
             // after enabled message listener.
             receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg 
instanceof TopicMessageImpl
                                 ? ((TopicMessageImpl<T>) msg).getMessage() : 
msg));
+
+            MessageImpl<T> innerMessage = (MessageImpl<T>) (msg instanceof 
TopicMessageImpl
+                    ? ((TopicMessageImpl<T>) msg).getMessage() : msg);
+            if (!receivedConsumer.isValidConsumerEpoch(innerMessage)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Skipping processing message since the 
consumer epoch is not valid. {}", topic,
+                            subscription, msg.getMessageId());
+                }
+                return;
+            }
+
             MessageId id;
             if (this instanceof ConsumerImpl) {
                 id = MessageIdAdvUtils.discardBatch(msg.getMessageId());

Reply via email to