This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4a4e910771be3002ec7af499215c4671015c5612 Author: Penghui Li <peng...@apache.org> AuthorDate: Wed Jul 2 18:58:39 2025 -0700 [fix][client] NPE in MultiTopicsConsumerImpl.negativeAcknowledge (#24476) (cherry picked from commit 89167308709cf72d3f2731bad1bb30384f80abb4) --- .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f5b89365f8f..451ddaf76db 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -566,6 +566,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @Override public void negativeAcknowledge(MessageId messageId) { + if (getState() != State.Ready) { + log.warn("[{}] [{}] Cannot negative acknowledge message {} - consumer is not ready (state: {})", + topic, subscription, messageId, getState()); + return; + } checkArgument(messageId instanceof TopicMessageId); ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic()); consumer.negativeAcknowledge(messageId); @@ -574,6 +579,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @Override public void negativeAcknowledge(Message<?> message) { + if (getState() != State.Ready) { + log.warn("[{}] [{}] Cannot negative acknowledge message {} - consumer is not ready (state: {})", + topic, subscription, message.getMessageId(), getState()); + return; + } MessageId messageId = message.getMessageId(); checkArgument(messageId instanceof TopicMessageId); ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic());