This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 05ff9358be0 [fix][broker] Pattern subscription doesn't work when the 
pattern excludes the topic domain. (#24072)
05ff9358be0 is described below

commit 05ff9358be0c18a335c200fbe059925eeb31ea03
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Fri Mar 14 08:16:54 2025 +0800

    [fix][broker] Pattern subscription doesn't work when the pattern excludes 
the topic domain. (#24072)
    
    (cherry picked from commit 3bae1d1648edcedd03973476ef281715499a549e)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  3 +-
 .../pulsar/broker/service/TopicListService.java    |  3 +-
 .../broker/service/TopicListWatcherTest.java       |  2 +-
 .../client/impl/PatternTopicsConsumerImplTest.java | 34 ++++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  6 +++-
 .../org/apache/pulsar/common/topics/TopicList.java |  4 +--
 6 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ca4fce4de68..93ffd6ebb27 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3143,7 +3143,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
 
         Pattern topicsPattern = 
Pattern.compile(commandWatchTopicList.hasTopicsPattern()
-                ? commandWatchTopicList.getTopicsPattern() : 
TopicList.ALL_TOPICS_PATTERN);
+                ? 
TopicList.removeTopicDomainScheme(commandWatchTopicList.getTopicsPattern())
+                : TopicList.ALL_TOPICS_PATTERN);
         String topicsHash = commandWatchTopicList.hasTopicsHash()
                 ? commandWatchTopicList.getTopicsHash() : null;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index b18286ee062..46bc3f6351b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -70,7 +70,8 @@ public class TopicListService {
          */
         @Override
         public void accept(String topicName, NotificationType 
notificationType) {
-            if 
(topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches())
 {
+            String partitionedTopicName = 
TopicName.get(topicName).getPartitionedTopicName();
+            if 
(topicsPattern.matcher(TopicList.removeTopicDomainScheme(partitionedTopicName)).matches())
 {
                 List<String> newTopics;
                 List<String> deletedTopics;
                 if (notificationType == NotificationType.Deleted) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
index c232675779f..330457c32b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
@@ -40,7 +40,7 @@ public class TopicListWatcherTest {
     );
 
     private static final long ID = 7;
-    private static final Pattern PATTERN = 
Pattern.compile("persistent://tenant/ns/topic\\d+");
+    private static final Pattern PATTERN = 
Pattern.compile("tenant/ns/topic\\d+");
 
 
     private TopicListService topicListService;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index a6b080cf6c7..70f9e084e9a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -684,6 +684,40 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = testTimeout)
+    public void testSubscribePatterWithOutTopicDomain() throws Exception {
+        final String key = "testSubscribePatterWithOutTopicDomain";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final Pattern pattern = 
Pattern.compile("my-property/my-ns/test-pattern.*");
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 0. Need make sure topic watcher started
+        waitForTopicListWatcherStarted(consumer);
+
+        // 1. create partition topic
+        String topicName = "persistent://my-property/my-ns/test-pattern" + key;
+        admin.topics().createPartitionedTopic(topicName, 4);
+
+        // 2. verify broker will push the changes to 
update(CommandWatchTopicUpdate).
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 4);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 4);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 1);
+        });
+
+        // cleanup.
+        consumer.close();
+        admin.topics().deletePartitionedTopic(topicName);
+        pulsarClient.close();
+    }
+
     @DataProvider(name= "regexpConsumerArgs")
     public Object[][] regexpConsumerArgs(){
         return new Object[][]{
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index caf726ef631..aa1538d6e1d 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -128,6 +128,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
     /**
      * Specify a pattern for topics(not contains the partition suffix) that 
this consumer subscribes to.
      *
+     * <p>Will ignore the topic domain("persistent://" or "non-persistent://") 
when pattern matching.
+     *
      * <p>The pattern is applied to subscribe to all topics, within a single 
namespace, that match the
      * pattern.
      *
@@ -143,7 +145,9 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * Specify a pattern for topics(not contains the partition suffix) that 
this consumer subscribes to.
      *
      * <p>It accepts a regular expression that is compiled into a pattern 
internally. E.g.,
-     * "persistent://public/default/pattern-topic-.*"
+     * "persistent://public/default/pattern-topic-.*" or 
"public/default/pattern-topic-.*"
+     *
+     * <p>Will ignore the topic domain("persistent://" or "non-persistent://") 
when pattern matching.
      *
      * <p>The pattern is applied to subscribe to all topics, within a single 
namespace, that match the
      * pattern.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 4dd48732225..7a5659c33b5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.common.topics;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -84,8 +83,7 @@ public class TopicList {
         return s1;
     }
 
-    @VisibleForTesting
-    static String removeTopicDomainScheme(String originalRegexp) {
+    public static String removeTopicDomainScheme(String originalRegexp) {
         if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
             return originalRegexp;
         }

Reply via email to