This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fb06ac24cd033271032a6f24108949ad74027af1
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Jun 17 12:40:13 2025 +0800

    [fix][client] Prevent NPE when seeking with null topic in TopicMessageId 
(#24404)
    
    (cherry picked from commit 9337405aab40dc06423542dffe32a9ab3e971a76)
---
 .../pulsar/broker/service/SubscriptionSeekTest.java    | 18 ++++++++++++++++++
 .../pulsar/client/impl/MultiTopicsConsumerImpl.java    |  9 +++++++--
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 4970dc88188..f5f16eaf223 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -64,6 +65,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.naming.TopicName;
@@ -465,6 +467,22 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testSeekWithNonOwnerTopicMessage() throws Exception {
+        final String topicName = 
"persistent://prop/use/ns-abc/testNonOwnerTopicMessage";
+
+        admin.topics().createPartitionedTopic(topicName, 2);
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscription").subscribe();
+        assertThatThrownBy(
+                // seek with a TopicMessageIdImpl that has a null topic.
+                () -> consumer.seek(new TopicMessageIdImpl(null, new 
BatchMessageIdImpl(123L, 345L, 566, 789)))
+            )
+            .isInstanceOf(PulsarClientException.class)
+            .hasMessage("The owner topic is null");
+    }
+
     @Test
     public void testSeekTime() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/testSeekTime";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8ab3b7ac7de..f30dc48dab7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -786,10 +786,15 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         final ConsumerImpl<T> internalConsumer;
         if (messageId instanceof TopicMessageId) {
             TopicMessageId topicMessageId = (TopicMessageId) messageId;
-            internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
+            String ownerTopic = topicMessageId.getOwnerTopic();
+            if (ownerTopic == null) {
+                return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                        "The owner topic is null"));
+            }
+            internalConsumer = consumers.get(ownerTopic);
             if (internalConsumer == null) {
                 return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
-                        "The owner topic " + topicMessageId.getOwnerTopic() + 
" is not subscribed"));
+                        "The owner topic " + ownerTopic + " is not 
subscribed"));
             }
         } else {
             internalConsumer = null;

Reply via email to