This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c0029d7381e9b5a91fad35102044bb81ebffc158
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 10 18:08:48 2024 +0800

    [fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a 
quote pattern (#23014)
    
    (cherry picked from commit 7c0e82739215fbae9e21270d4c70c9a52dd3e403)
---
 .../client/impl/PatternTopicsConsumerImplTest.java | 56 +++++++++++++++++++++
 .../org/apache/pulsar/common/topics/TopicList.java | 20 ++++++--
 .../apache/pulsar/common/topics/TopicListTest.java | 58 +++++++++++++++++++++-
 3 files changed, 128 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index d443ca44a7e..04bfb03bf46 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,8 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -1113,4 +1116,57 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName 
+ "-1").join(), Optional.empty());
         assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + 
"-2").join().isPresent());
     }
+
+    @Test(dataProvider = "partitioned")
+    public void testPatternQuote(boolean partitioned) throws Exception {
+        final NamespaceName namespace = NamespaceName.get("public/default");
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+        final LookupService lookup = client.getLookup();
+        List<String> expectedRes = new ArrayList<>();
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topicName, 2);
+            
expectedRes.add(TopicName.get(topicName).getPartition(0).toString());
+            
expectedRes.add(TopicName.get(topicName).getPartition(1).toString());
+            Collections.sort(expectedRes);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+            expectedRes.add(topicName);
+        }
+
+        // Verify 1: "java.util.regex.Pattern.quote".
+        String pattern1 = java.util.regex.Pattern.quote(topicName);
+        List<String> res1 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                        pattern1, 
null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res1);
+        assertEquals(res1, expectedRes);
+
+        // Verify 2: "com.google.re2j.Pattern.quote"
+        String pattern2 = com.google.re2j.Pattern.quote(topicName);
+        List<String> res2 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                pattern2, null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res2);
+        assertEquals(res2, expectedRes);
+
+        // Verify 3: "java.util.regex.Pattern.quote" & "^$"
+        String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$";
+        List<String> res3 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                pattern3, null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res3);
+        assertEquals(res3, expectedRes);
+
+        // Verify 4: "com.google.re2j.Pattern.quote" & "^$"
+        String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$";
+        List<String> res4 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                pattern4, null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res4);
+        assertEquals(res4, expectedRes);
+
+        // cleanup.
+        if (partitioned) {
+            admin.topics().deletePartitionedTopic(topicName, false);
+        } else {
+            admin.topics().delete(topicName, false);
+        }
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 4c0a8d500b7..4dd48732225 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.topics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -28,6 +29,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 
 @UtilityClass
@@ -82,15 +84,23 @@ public class TopicList {
         return s1;
     }
 
-    private static String removeTopicDomainScheme(String originalRegexp) {
+    @VisibleForTesting
+    static String removeTopicDomainScheme(String originalRegexp) {
         if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
             return originalRegexp;
         }
-        String removedTopicDomain = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
-        if (originalRegexp.contains("^")) {
-            return String.format("^%s", removedTopicDomain);
+        String[] parts = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
+        String prefix = parts[0];
+        String removedTopicDomain = parts[1];
+        if (prefix.equals(TopicDomain.persistent.value()) || 
prefix.equals(TopicDomain.non_persistent.value())) {
+            prefix = "";
+        } else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
+            prefix = prefix.substring(0, prefix.length() - 
TopicDomain.non_persistent.value().length());
+        } else if (prefix.endsWith(TopicDomain.persistent.value())){
+            prefix = prefix.substring(0, prefix.length() - 
TopicDomain.persistent.value().length());
         } else {
-            return removedTopicDomain;
+            throw new IllegalArgumentException("Does not support topic domain: 
" + prefix);
         }
+        return String.format("%s%s", prefix, removedTopicDomain);
     }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
index 9069dd6dcc7..b3a7536ebff 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Stream;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class TopicListTest {
 
@@ -107,5 +108,60 @@ public class TopicListTest {
 
     }
 
-
+    @Test
+    public void testRemoveTopicDomainScheme() {
+        // persistent.
+        final String tpName1 = "persistent://public/default/tp";
+        String res1 = TopicList.removeTopicDomainScheme(tpName1);
+        assertEquals(res1, "public/default/tp");
+
+        // non-persistent
+        final String tpName2 = "non-persistent://public/default/tp";
+        String res2 = TopicList.removeTopicDomainScheme(tpName2);
+        assertEquals(res2, "public/default/tp");
+
+        // without topic domain.
+        final String tpName3 = "public/default/tp";
+        String res3 = TopicList.removeTopicDomainScheme(tpName3);
+        assertEquals(res3, "public/default/tp");
+
+        // persistent & "java.util.regex.Pattern.quote".
+        final String tpName4 = java.util.regex.Pattern.quote(tpName1);
+        String res4 = TopicList.removeTopicDomainScheme(tpName4);
+        assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp"));
+
+        // persistent & "java.util.regex.Pattern.quote" & "^$".
+        final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + 
"$";
+        String res5 = TopicList.removeTopicDomainScheme(tpName5);
+        assertEquals(res5, "^" + 
java.util.regex.Pattern.quote("public/default/tp") + "$");
+
+        // persistent & "com.google.re2j.Pattern.quote".
+        final String tpName6 = Pattern.quote(tpName1);
+        String res6 = TopicList.removeTopicDomainScheme(tpName6);
+        assertEquals(res6, Pattern.quote("public/default/tp"));
+
+        // non-persistent & "java.util.regex.Pattern.quote".
+        final String tpName7 = java.util.regex.Pattern.quote(tpName2);
+        String res7 = TopicList.removeTopicDomainScheme(tpName7);
+        assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp"));
+
+        // non-persistent & "com.google.re2j.Pattern.quote".
+        final String tpName8 = Pattern.quote(tpName2);
+        String res8 = TopicList.removeTopicDomainScheme(tpName8);
+        assertEquals(res8, Pattern.quote("public/default/tp"));
+
+        // non-persistent & "com.google.re2j.Pattern.quote" & "^$".
+        final String tpName9 = "^" + Pattern.quote(tpName2) + "$";
+        String res9 = TopicList.removeTopicDomainScheme(tpName9);
+        assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$");
+
+        // wrong topic domain.
+        final String tpName10 = "xx://public/default/tp";
+        try {
+            TopicList.removeTopicDomainScheme(tpName10);
+            fail("Does not support the topic domain xx");
+        } catch (Exception ex) {
+            // expected error.
+        }
+    }
 }

Reply via email to