This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fb06ac24cd033271032a6f24108949ad74027af1 Author: Ruimin MA <[email protected]> AuthorDate: Tue Jun 17 12:40:13 2025 +0800 [fix][client] Prevent NPE when seeking with null topic in TopicMessageId (#24404) (cherry picked from commit 9337405aab40dc06423542dffe32a9ab3e971a76) --- .../pulsar/broker/service/SubscriptionSeekTest.java | 18 ++++++++++++++++++ .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 9 +++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 4970dc88188..f5f16eaf223 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -64,6 +65,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.naming.TopicName; @@ -465,6 +467,22 @@ public class SubscriptionSeekTest extends BrokerTestBase { } } + @Test + public void testSeekWithNonOwnerTopicMessage() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testNonOwnerTopicMessage"; + + admin.topics().createPartitionedTopic(topicName, 2); + @Cleanup + org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("my-subscription").subscribe(); + assertThatThrownBy( + // seek with a TopicMessageIdImpl that has a null topic. + () -> consumer.seek(new TopicMessageIdImpl(null, new BatchMessageIdImpl(123L, 345L, 566, 789))) + ) + .isInstanceOf(PulsarClientException.class) + .hasMessage("The owner topic is null"); + } + @Test public void testSeekTime() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testSeekTime"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8ab3b7ac7de..f30dc48dab7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -786,10 +786,15 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { final ConsumerImpl<T> internalConsumer; if (messageId instanceof TopicMessageId) { TopicMessageId topicMessageId = (TopicMessageId) messageId; - internalConsumer = consumers.get(topicMessageId.getOwnerTopic()); + String ownerTopic = topicMessageId.getOwnerTopic(); + if (ownerTopic == null) { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "The owner topic is null")); + } + internalConsumer = consumers.get(ownerTopic); if (internalConsumer == null) { return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( - "The owner topic " + topicMessageId.getOwnerTopic() + " is not subscribed")); + "The owner topic " + ownerTopic + " is not subscribed")); } } else { internalConsumer = null;
