This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c0e8273921 [fix][broker]Fix lookupService.getTopicsUnderNamespace can
not work with a quote pattern (#23014)
7c0e8273921 is described below
commit 7c0e82739215fbae9e21270d4c70c9a52dd3e403
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 10 18:08:48 2024 +0800
[fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a
quote pattern (#23014)
---
.../client/impl/PatternTopicsConsumerImplTest.java | 56 +++++++++++++++++++++
.../org/apache/pulsar/common/topics/TopicList.java | 20 ++++++--
.../apache/pulsar/common/topics/TopicListTest.java | 58 +++++++++++++++++++++-
3 files changed, 128 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9c19fadffb1..4823426c8b8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,8 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -49,6 +51,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -1114,4 +1117,57 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName
+ "-1").join(), Optional.empty());
assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName +
"-2").join().isPresent());
}
+
+ @Test(dataProvider = "partitioned")
+ public void testPatternQuote(boolean partitioned) throws Exception {
+ final NamespaceName namespace = NamespaceName.get("public/default");
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+ final LookupService lookup = client.getLookup();
+ List<String> expectedRes = new ArrayList<>();
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topicName, 2);
+
expectedRes.add(TopicName.get(topicName).getPartition(0).toString());
+
expectedRes.add(TopicName.get(topicName).getPartition(1).toString());
+ Collections.sort(expectedRes);
+ } else {
+ admin.topics().createNonPartitionedTopic(topicName);
+ expectedRes.add(topicName);
+ }
+
+ // Verify 1: "java.util.regex.Pattern.quote".
+ String pattern1 = java.util.regex.Pattern.quote(topicName);
+ List<String> res1 = lookup.getTopicsUnderNamespace(namespace,
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+ pattern1,
null).join().getNonPartitionedOrPartitionTopics();
+ Collections.sort(res1);
+ assertEquals(res1, expectedRes);
+
+ // Verify 2: "com.google.re2j.Pattern.quote"
+ String pattern2 = com.google.re2j.Pattern.quote(topicName);
+ List<String> res2 = lookup.getTopicsUnderNamespace(namespace,
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+ pattern2, null).join().getNonPartitionedOrPartitionTopics();
+ Collections.sort(res2);
+ assertEquals(res2, expectedRes);
+
+ // Verify 3: "java.util.regex.Pattern.quote" & "^$"
+ String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$";
+ List<String> res3 = lookup.getTopicsUnderNamespace(namespace,
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+ pattern3, null).join().getNonPartitionedOrPartitionTopics();
+ Collections.sort(res3);
+ assertEquals(res3, expectedRes);
+
+ // Verify 4: "com.google.re2j.Pattern.quote" & "^$"
+ String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$";
+ List<String> res4 = lookup.getTopicsUnderNamespace(namespace,
CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+ pattern4, null).join().getNonPartitionedOrPartitionTopics();
+ Collections.sort(res4);
+ assertEquals(res4, expectedRes);
+
+ // cleanup.
+ if (partitioned) {
+ admin.topics().deletePartitionedTopic(topicName, false);
+ } else {
+ admin.topics().delete(topicName, false);
+ }
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index e8a485b844d..9e24483df82 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.topics;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import com.google.re2j.Pattern;
import java.nio.charset.StandardCharsets;
@@ -28,6 +29,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@UtilityClass
@@ -83,15 +85,23 @@ public class TopicList {
return s1;
}
- private static String removeTopicDomainScheme(String originalRegexp) {
+ @VisibleForTesting
+ static String removeTopicDomainScheme(String originalRegexp) {
if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
return originalRegexp;
}
- String removedTopicDomain =
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
- if (originalRegexp.contains("^")) {
- return String.format("^%s", removedTopicDomain);
+ String[] parts =
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
+ String prefix = parts[0];
+ String removedTopicDomain = parts[1];
+ if (prefix.equals(TopicDomain.persistent.value()) ||
prefix.equals(TopicDomain.non_persistent.value())) {
+ prefix = "";
+ } else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
+ prefix = prefix.substring(0, prefix.length() -
TopicDomain.non_persistent.value().length());
+ } else if (prefix.endsWith(TopicDomain.persistent.value())){
+ prefix = prefix.substring(0, prefix.length() -
TopicDomain.persistent.value().length());
} else {
- return removedTopicDomain;
+ throw new IllegalArgumentException("Does not support topic domain:
" + prefix);
}
+ return String.format("%s%s", prefix, removedTopicDomain);
}
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
index a83ef2ac8c7..7bcdacb2e9b 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java
@@ -30,6 +30,7 @@ import java.util.stream.Stream;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class TopicListTest {
@@ -107,5 +108,60 @@ public class TopicListTest {
}
-
+ @Test
+ public void testRemoveTopicDomainScheme() {
+ // persistent.
+ final String tpName1 = "persistent://public/default/tp";
+ String res1 = TopicList.removeTopicDomainScheme(tpName1);
+ assertEquals(res1, "public/default/tp");
+
+ // non-persistent
+ final String tpName2 = "non-persistent://public/default/tp";
+ String res2 = TopicList.removeTopicDomainScheme(tpName2);
+ assertEquals(res2, "public/default/tp");
+
+ // without topic domain.
+ final String tpName3 = "public/default/tp";
+ String res3 = TopicList.removeTopicDomainScheme(tpName3);
+ assertEquals(res3, "public/default/tp");
+
+ // persistent & "java.util.regex.Pattern.quote".
+ final String tpName4 = java.util.regex.Pattern.quote(tpName1);
+ String res4 = TopicList.removeTopicDomainScheme(tpName4);
+ assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp"));
+
+ // persistent & "java.util.regex.Pattern.quote" & "^$".
+ final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) +
"$";
+ String res5 = TopicList.removeTopicDomainScheme(tpName5);
+ assertEquals(res5, "^" +
java.util.regex.Pattern.quote("public/default/tp") + "$");
+
+ // persistent & "com.google.re2j.Pattern.quote".
+ final String tpName6 = Pattern.quote(tpName1);
+ String res6 = TopicList.removeTopicDomainScheme(tpName6);
+ assertEquals(res6, Pattern.quote("public/default/tp"));
+
+ // non-persistent & "java.util.regex.Pattern.quote".
+ final String tpName7 = java.util.regex.Pattern.quote(tpName2);
+ String res7 = TopicList.removeTopicDomainScheme(tpName7);
+ assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp"));
+
+ // non-persistent & "com.google.re2j.Pattern.quote".
+ final String tpName8 = Pattern.quote(tpName2);
+ String res8 = TopicList.removeTopicDomainScheme(tpName8);
+ assertEquals(res8, Pattern.quote("public/default/tp"));
+
+ // non-persistent & "com.google.re2j.Pattern.quote" & "^$".
+ final String tpName9 = "^" + Pattern.quote(tpName2) + "$";
+ String res9 = TopicList.removeTopicDomainScheme(tpName9);
+ assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$");
+
+ // wrong topic domain.
+ final String tpName10 = "xx://public/default/tp";
+ try {
+ TopicList.removeTopicDomainScheme(tpName10);
+ fail("Does not support the topic domain xx");
+ } catch (Exception ex) {
+ // expected error.
+ }
+ }
}