This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a8882209475537fe730eefe26cffe0e4152d8433
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 11 23:33:08 2024 +0800

    [fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a 
quote pattern (#23014)
    
    (cherry picked from commit 7c0e82739215fbae9e21270d4c70c9a52dd3e403)
---
 .../apache/pulsar/broker/service/TopicGCTest.java  |  6 +--
 .../client/impl/PatternTopicsConsumerImplTest.java | 56 +++++++++++++++++++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  6 ++-
 .../org/apache/pulsar/common/topics/TopicList.java | 20 ++++++--
 .../apache/pulsar/common/topics/TopicListTest.java | 58 +++++++++++++++++++++-
 5 files changed, 135 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
index aabef91a7db..5f9833e1d81 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -38,8 +38,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -197,8 +197,8 @@ public class TopicGCTest extends ProducerConsumerBase {
             Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS);
             assertNotNull(msg, "Expected at least received 2 messages.");
             log.info("received msg[{}]: {}", i, msg.getValue());
-            MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId();
-            if (messageId.getPartitionIndex() == 1) {
+            TopicMessageIdImpl messageId = (TopicMessageIdImpl) 
msg.getMessageId();
+            if (messageId.getTopicPartitionName().equals(partition1)) {
                 consumer1.acknowledgeAsync(msg);
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 51f0ae817c2..4147673c6e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,8 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -1044,4 +1047,57 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName 
+ "-1").join(), Optional.empty());
         assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + 
"-2").join().isPresent());
     }
+
+    @Test(dataProvider = "partitioned")
+    public void testPatternQuote(boolean partitioned) throws Exception {
+        final NamespaceName namespace = NamespaceName.get("public/default");
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+        final LookupService lookup = client.getLookup();
+        List<String> expectedRes = new ArrayList<>();
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topicName, 2);
+            
expectedRes.add(TopicName.get(topicName).getPartition(0).toString());
+            
expectedRes.add(TopicName.get(topicName).getPartition(1).toString());
+            Collections.sort(expectedRes);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+            expectedRes.add(topicName);
+        }
+
+        // Verify 1: "java.util.regex.Pattern.quote".
+        String pattern1 = java.util.regex.Pattern.quote(topicName);
+        List<String> res1 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                        pattern1, 
null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res1);
+        assertEquals(res1, expectedRes);
+
+        // Verify 2: "com.google.re2j.Pattern.quote"
+        String pattern2 = com.google.re2j.Pattern.quote(topicName);
+        List<String> res2 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                pattern2, null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res2);
+        assertEquals(res2, expectedRes);
+
+        // Verify 3: "java.util.regex.Pattern.quote" & "^$"
+        String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$";
+        List<String> res3 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                pattern3, null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res3);
+        assertEquals(res3, expectedRes);
+
+        // Verify 4: "com.google.re2j.Pattern.quote" & "^$"
+        String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$";
+        List<String> res4 = lookup.getTopicsUnderNamespace(namespace, 
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                pattern4, null).join().getNonPartitionedOrPartitionTopics();
+        Collections.sort(res4);
+        assertEquals(res4, expectedRes);
+
+        // cleanup.
+        if (partitioned) {
+            admin.topics().deletePartitionedTopic(topicName, false);
+        } else {
+            admin.topics().delete(topicName, false);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index c88e429733f..bf02232e0ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -49,6 +49,7 @@ import 
org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1269,19 +1270,20 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
                 .topicsPattern(topicName)
                 .subscriptionName("sub-issue-9585")
                 .subscribe();
+        PatternConsumerUpdateQueue taskQueue = 
WhiteboxImpl.getInternalState(consumer, "updateTaskQueue");
 
         Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
         Assert.assertEquals(consumer.getConsumers().size(), 3);
 
         admin.topics().deletePartitionedTopic(topicName, true);
-        
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+        taskQueue.appendRecheckOp();
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
             Assert.assertEquals(consumer.getConsumers().size(), 0);
         });
 
         admin.topics().createPartitionedTopic(topicName, 7);
-        
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+        taskQueue.appendRecheckOp();
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
             Assert.assertEquals(consumer.getConsumers().size(), 7);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index b231f21e598..380582edbde 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.topics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -28,6 +29,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 
 @UtilityClass
@@ -82,15 +84,23 @@ public class TopicList {
         return s1;
     }
 
-    private static String removeTopicDomainScheme(String originalRegexp) {
+    @VisibleForTesting
+    static String removeTopicDomainScheme(String originalRegexp) {
         if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
             return originalRegexp;
         }
-        String removedTopicDomain = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
-        if (originalRegexp.contains("^")) {
-            return String.format("^%s", removedTopicDomain);
+        String[] parts = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
+        String prefix = parts[0];
+        String removedTopicDomain = parts[1];
+        if (prefix.equals(TopicDomain.persistent.value()) || 
prefix.equals(TopicDomain.non_persistent.value())) {
+            prefix = "";
+        } else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
+            prefix = prefix.substring(0, prefix.length() - 
TopicDomain.non_persistent.value().length());
+        } else if (prefix.endsWith(TopicDomain.persistent.value())){
+            prefix = prefix.substring(0, prefix.length() - 
TopicDomain.persistent.value().length());
         } else {
-            return removedTopicDomain;
+            throw new IllegalArgumentException("Does not support topic domain: 
" + prefix);
         }
+        return String.format("%s%s", prefix, removedTopicDomain);
     }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
index 9c3b54a0f0d..bb9e6a91279 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Stream;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class TopicListTest {
 
@@ -107,5 +108,60 @@ public class TopicListTest {
 
     }
 
-
+    @Test
+    public void testRemoveTopicDomainScheme() {
+        // persistent.
+        final String tpName1 = "persistent://public/default/tp";
+        String res1 = TopicList.removeTopicDomainScheme(tpName1);
+        assertEquals(res1, "public/default/tp");
+
+        // non-persistent
+        final String tpName2 = "non-persistent://public/default/tp";
+        String res2 = TopicList.removeTopicDomainScheme(tpName2);
+        assertEquals(res2, "public/default/tp");
+
+        // without topic domain.
+        final String tpName3 = "public/default/tp";
+        String res3 = TopicList.removeTopicDomainScheme(tpName3);
+        assertEquals(res3, "public/default/tp");
+
+        // persistent & "java.util.regex.Pattern.quote".
+        final String tpName4 = java.util.regex.Pattern.quote(tpName1);
+        String res4 = TopicList.removeTopicDomainScheme(tpName4);
+        assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp"));
+
+        // persistent & "java.util.regex.Pattern.quote" & "^$".
+        final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + 
"$";
+        String res5 = TopicList.removeTopicDomainScheme(tpName5);
+        assertEquals(res5, "^" + 
java.util.regex.Pattern.quote("public/default/tp") + "$");
+
+        // persistent & "com.google.re2j.Pattern.quote".
+        final String tpName6 = Pattern.quote(tpName1);
+        String res6 = TopicList.removeTopicDomainScheme(tpName6);
+        assertEquals(res6, Pattern.quote("public/default/tp"));
+
+        // non-persistent & "java.util.regex.Pattern.quote".
+        final String tpName7 = java.util.regex.Pattern.quote(tpName2);
+        String res7 = TopicList.removeTopicDomainScheme(tpName7);
+        assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp"));
+
+        // non-persistent & "com.google.re2j.Pattern.quote".
+        final String tpName8 = Pattern.quote(tpName2);
+        String res8 = TopicList.removeTopicDomainScheme(tpName8);
+        assertEquals(res8, Pattern.quote("public/default/tp"));
+
+        // non-persistent & "com.google.re2j.Pattern.quote" & "^$".
+        final String tpName9 = "^" + Pattern.quote(tpName2) + "$";
+        String res9 = TopicList.removeTopicDomainScheme(tpName9);
+        assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$");
+
+        // wrong topic domain.
+        final String tpName10 = "xx://public/default/tp";
+        try {
+            TopicList.removeTopicDomainScheme(tpName10);
+            fail("Does not support the topic domain xx");
+        } catch (Exception ex) {
+            // expected error.
+        }
+    }
 }

Reply via email to