This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 05ff9358be0 [fix][broker] Pattern subscription doesn't work when the pattern excludes the topic domain. (#24072) 05ff9358be0 is described below commit 05ff9358be0c18a335c200fbe059925eeb31ea03 Author: Baodi Shi <ba...@apache.org> AuthorDate: Fri Mar 14 08:16:54 2025 +0800 [fix][broker] Pattern subscription doesn't work when the pattern excludes the topic domain. (#24072) (cherry picked from commit 3bae1d1648edcedd03973476ef281715499a549e) --- .../apache/pulsar/broker/service/ServerCnx.java | 3 +- .../pulsar/broker/service/TopicListService.java | 3 +- .../broker/service/TopicListWatcherTest.java | 2 +- .../client/impl/PatternTopicsConsumerImplTest.java | 34 ++++++++++++++++++++++ .../apache/pulsar/client/api/ConsumerBuilder.java | 6 +++- .../org/apache/pulsar/common/topics/TopicList.java | 4 +-- 6 files changed, 45 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ca4fce4de68..93ffd6ebb27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -3143,7 +3143,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); Pattern topicsPattern = Pattern.compile(commandWatchTopicList.hasTopicsPattern() - ? commandWatchTopicList.getTopicsPattern() : TopicList.ALL_TOPICS_PATTERN); + ? TopicList.removeTopicDomainScheme(commandWatchTopicList.getTopicsPattern()) + : TopicList.ALL_TOPICS_PATTERN); String topicsHash = commandWatchTopicList.hasTopicsHash() ? commandWatchTopicList.getTopicsHash() : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java index b18286ee062..46bc3f6351b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -70,7 +70,8 @@ public class TopicListService { */ @Override public void accept(String topicName, NotificationType notificationType) { - if (topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches()) { + String partitionedTopicName = TopicName.get(topicName).getPartitionedTopicName(); + if (topicsPattern.matcher(TopicList.removeTopicDomainScheme(partitionedTopicName)).matches()) { List<String> newTopics; List<String> deletedTopics; if (notificationType == NotificationType.Deleted) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java index c232675779f..330457c32b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java @@ -40,7 +40,7 @@ public class TopicListWatcherTest { ); private static final long ID = 7; - private static final Pattern PATTERN = Pattern.compile("persistent://tenant/ns/topic\\d+"); + private static final Pattern PATTERN = Pattern.compile("tenant/ns/topic\\d+"); private TopicListService topicListService; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index a6b080cf6c7..70f9e084e9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -684,6 +684,40 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { } } + @Test(timeOut = testTimeout) + public void testSubscribePatterWithOutTopicDomain() throws Exception { + final String key = "testSubscribePatterWithOutTopicDomain"; + final String subscriptionName = "my-ex-subscription-" + key; + final Pattern pattern = Pattern.compile("my-property/my-ns/test-pattern.*"); + + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(4) + .subscribe(); + + // 0. Need make sure topic watcher started + waitForTopicListWatcherStarted(consumer); + + // 1. create partition topic + String topicName = "persistent://my-property/my-ns/test-pattern" + key; + admin.topics().createPartitionedTopic(topicName, 4); + + // 2. verify broker will push the changes to update(CommandWatchTopicUpdate). + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1); + }); + + // cleanup. + consumer.close(); + admin.topics().deletePartitionedTopic(topicName); + pulsarClient.close(); + } + @DataProvider(name= "regexpConsumerArgs") public Object[][] regexpConsumerArgs(){ return new Object[][]{ diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index caf726ef631..aa1538d6e1d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -128,6 +128,8 @@ public interface ConsumerBuilder<T> extends Cloneable { /** * Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to. * + * <p>Will ignore the topic domain("persistent://" or "non-persistent://") when pattern matching. + * * <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the * pattern. * @@ -143,7 +145,9 @@ public interface ConsumerBuilder<T> extends Cloneable { * Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to. * * <p>It accepts a regular expression that is compiled into a pattern internally. E.g., - * "persistent://public/default/pattern-topic-.*" + * "persistent://public/default/pattern-topic-.*" or "public/default/pattern-topic-.*" + * + * <p>Will ignore the topic domain("persistent://" or "non-persistent://") when pattern matching. * * <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the * pattern. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index 4dd48732225..7a5659c33b5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.common.topics; -import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -84,8 +83,7 @@ public class TopicList { return s1; } - @VisibleForTesting - static String removeTopicDomainScheme(String originalRegexp) { + public static String removeTopicDomainScheme(String originalRegexp) { if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) { return originalRegexp; }