This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ce8c291aa6e [fix] [broker] Fix break change: could not subscribe
partitioned topic with a suffix-matched regexp due to a mistake of PIP-145
(#21885)
ce8c291aa6e is described below
commit ce8c291aa6e835f3f54361b80b9c2e84a5bee0c4
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jan 15 20:41:35 2024 +0800
[fix] [broker] Fix break change: could not subscribe partitioned topic
with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
(cherry picked from commit 4ebbd2f5244ea2f8c0fd75e4dcb52055568b7fc7)
---
.../broker/auth/MockedPulsarServiceBaseTest.java | 16 ++-
.../client/impl/PatternTopicsConsumerImplTest.java | 107 +++++++++++++++++++++
.../org/apache/pulsar/common/topics/TopicList.java | 21 +++-
3 files changed, 139 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b888533d348..941a9356a86 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocat
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
@@ -55,9 +56,9 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -698,5 +699,16 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
}
}
+ public static void reconnectAllConnections(PulsarClientImpl c) throws
Exception {
+ ConnectionPool pool = c.getCnxPool();
+ Method closeAllConnections =
ConnectionPool.class.getDeclaredMethod("closeAllConnections", new Class[]{});
+ closeAllConnections.setAccessible(true);
+ closeAllConnections.invoke(pool, new Object[]{});
+ }
+
+ public void reconnectAllConnections() throws Exception {
+ reconnectAllConnections((PulsarClientImpl) pulsarClient);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index c708b4cae0a..9bcbdfed4c9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
+import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@ import java.util.regex.Pattern;
import java.util.stream.IntStream;
import io.netty.util.Timeout;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -679,6 +681,111 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
}
}
+ @DataProvider(name= "partitioned")
+ public Object[][] partitioned(){
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(timeOut = testTimeout, dataProvider = "partitioned")
+ public void testPreciseRegexpSubscribe(boolean partitioned) throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscriptionName = "s1";
+ final Pattern pattern = Pattern.compile(String.format("%s$",
topicName));
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ // Disable automatic discovery.
+ .patternAutoDiscoveryPeriod(1000)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ // 1. create topic.
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topicName, 1);
+ } else {
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ // 2. verify consumer can subscribe the topic.
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 1);
+ if (partitioned) {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 1);
+ } else {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
+ }
+ });
+
+ // cleanup.
+ consumer.close();
+ if (partitioned) {
+ admin.topics().deletePartitionedTopic(topicName);
+ } else {
+ admin.topics().delete(topicName);
+ }
+ }
+
+ @Test(timeOut = 240 * 1000, dataProvider = "partitioned")
+ public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean
partitioned) throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscriptionName = "s1";
+ final Pattern pattern = Pattern.compile(String.format("%s$",
topicName));
+
+ // Close all ServerCnx by close client-side sockets to make the config
changes effect.
+
pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(false);
+ reconnectAllConnections();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ // Disable brokerSideSubscriptionPatternEvaluation will
leading disable topic list watcher.
+ // So set patternAutoDiscoveryPeriod to a little value.
+ .patternAutoDiscoveryPeriod(1)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ // 1. create topic.
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topicName, 1);
+ } else {
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ // 2. verify consumer can subscribe the topic.
+ // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we
set the test timeout to a triple value.
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 1);
+ if (partitioned) {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 1);
+ } else {
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
+ }
+ });
+
+ // cleanup.
+ consumer.close();
+ if (partitioned) {
+ admin.topics().deletePartitionedTopic(topicName);
+ } else {
+ admin.topics().delete(topicName);
+ }
+ // Close all ServerCnx by close client-side sockets to make the config
changes effect.
+
pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(true);
+ reconnectAllConnections();
+ }
+
private PulsarClient createDelayWatchTopicsClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 250cea217ee..4c0a8d500b7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -47,13 +47,16 @@ public class TopicList {
}
public static List<String> filterTopics(List<String> original, Pattern
topicsPattern) {
- final Pattern shortenedTopicsPattern =
topicsPattern.toString().contains(SCHEME_SEPARATOR)
- ?
Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) :
topicsPattern;
+ final Pattern shortenedTopicsPattern =
Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));
return original.stream()
.map(TopicName::get)
+ .filter(topicName -> {
+ String partitionedTopicName =
topicName.getPartitionedTopicName();
+ String removedScheme =
SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
+ return
shortenedTopicsPattern.matcher(removedScheme).matches();
+ })
.map(TopicName::toString)
- .filter(topic ->
shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches())
.collect(Collectors.toList());
}
@@ -78,4 +81,16 @@ public class TopicList {
s1.removeAll(list2);
return s1;
}
+
+ private static String removeTopicDomainScheme(String originalRegexp) {
+ if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
+ return originalRegexp;
+ }
+ String removedTopicDomain =
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
+ if (originalRegexp.contains("^")) {
+ return String.format("^%s", removedTopicDomain);
+ } else {
+ return removedTopicDomain;
+ }
+ }
}