This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a4e910771be3002ec7af499215c4671015c5612
Author: Penghui Li <peng...@apache.org>
AuthorDate: Wed Jul 2 18:58:39 2025 -0700

    [fix][client] NPE in MultiTopicsConsumerImpl.negativeAcknowledge (#24476)
    
    (cherry picked from commit 89167308709cf72d3f2731bad1bb30384f80abb4)
---
 .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f5b89365f8f..451ddaf76db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -566,6 +566,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void negativeAcknowledge(MessageId messageId) {
+        if (getState() != State.Ready) {
+            log.warn("[{}] [{}] Cannot negative acknowledge message {} - 
consumer is not ready (state: {})",
+                    topic, subscription, messageId, getState());
+            return;
+        }
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(messageId);
@@ -574,6 +579,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void negativeAcknowledge(Message<?> message) {
+        if (getState() != State.Ready) {
+            log.warn("[{}] [{}] Cannot negative acknowledge message {} - 
consumer is not ready (state: {})",
+                    topic, subscription, message.getMessageId(), getState());
+            return;
+        }
         MessageId messageId = message.getMessageId();
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());

Reply via email to