This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9deee37f0b4 [fix][client] PIP-84: Skip processing a message in the
message listener if the consumer epoch is no longer valid (#25007)
9deee37f0b4 is described below
commit 9deee37f0b42c1899965c5f19440c62cb35eb9df
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Nov 24 14:35:55 2025 +0200
[fix][client] PIP-84: Skip processing a message in the message listener if
the consumer epoch is no longer valid (#25007)
---
.../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 67306e16c09..c4b413055d4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1210,6 +1210,17 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
// after enabled message listener.
receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg
instanceof TopicMessageImpl
? ((TopicMessageImpl<T>) msg).getMessage() :
msg));
+
+ MessageImpl<T> innerMessage = (MessageImpl<T>) (msg instanceof
TopicMessageImpl
+ ? ((TopicMessageImpl<T>) msg).getMessage() : msg);
+ if (!receivedConsumer.isValidConsumerEpoch(innerMessage)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Skipping processing message since the
consumer epoch is not valid. {}", topic,
+ subscription, msg.getMessageId());
+ }
+ return;
+ }
+
MessageId id;
if (this instanceof ConsumerImpl) {
id = MessageIdAdvUtils.discardBatch(msg.getMessageId());