This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ce8c291aa6e  [fix] [broker] Fix break change: could not subscribe 
partitioned topic with a suffix-matched regexp due to a mistake of PIP-145 
(#21885)
ce8c291aa6e is described below

commit ce8c291aa6e835f3f54361b80b9c2e84a5bee0c4
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jan 15 20:41:35 2024 +0800

     [fix] [broker] Fix break change: could not subscribe partitioned topic 
with a suffix-matched regexp due to a mistake of PIP-145 (#21885)
    
    (cherry picked from commit 4ebbd2f5244ea2f8c0fd75e4dcb52055568b7fc7)
---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  16 ++-
 .../client/impl/PatternTopicsConsumerImplTest.java | 107 +++++++++++++++++++++
 .../org/apache/pulsar/common/topics/TopicList.java |  21 +++-
 3 files changed, 139 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b888533d348..941a9356a86 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocat
 import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
@@ -55,9 +56,9 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -698,5 +699,16 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         }
     }
 
+    public static void reconnectAllConnections(PulsarClientImpl c) throws 
Exception {
+        ConnectionPool pool = c.getCnxPool();
+        Method closeAllConnections = 
ConnectionPool.class.getDeclaredMethod("closeAllConnections", new Class[]{});
+        closeAllConnections.setAccessible(true);
+        closeAllConnections.invoke(pool, new Object[]{});
+    }
+
+    public void reconnectAllConnections() throws Exception {
+        reconnectAllConnections((PulsarClientImpl) pulsarClient);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index c708b4cae0a..9bcbdfed4c9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@ import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 
 import io.netty.util.Timeout;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -679,6 +681,111 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
+    @DataProvider(name= "partitioned")
+    public Object[][] partitioned(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(timeOut = testTimeout, dataProvider = "partitioned")
+    public void testPreciseRegexpSubscribe(boolean partitioned) throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscriptionName = "s1";
+        final Pattern pattern = Pattern.compile(String.format("%s$", 
topicName));
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                // Disable automatic discovery.
+                .patternAutoDiscoveryPeriod(1000)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 1. create topic.
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topicName, 1);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        // 2. verify consumer can subscribe the topic.
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 1);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 1);
+            if (partitioned) {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 1);
+            } else {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
+            }
+        });
+
+        // cleanup.
+        consumer.close();
+        if (partitioned) {
+            admin.topics().deletePartitionedTopic(topicName);
+        } else {
+            admin.topics().delete(topicName);
+        }
+    }
+
+    @Test(timeOut = 240 * 1000, dataProvider = "partitioned")
+    public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean 
partitioned) throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscriptionName = "s1";
+        final Pattern pattern = Pattern.compile(String.format("%s$", 
topicName));
+
+        // Close all ServerCnx by close client-side sockets to make the config 
changes effect.
+        
pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(false);
+        reconnectAllConnections();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                // Disable brokerSideSubscriptionPatternEvaluation will 
leading disable topic list watcher.
+                // So set patternAutoDiscoveryPeriod to a little value.
+                .patternAutoDiscoveryPeriod(1)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 1. create topic.
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topicName, 1);
+        } else {
+            admin.topics().createNonPartitionedTopic(topicName);
+        }
+
+        // 2. verify consumer can subscribe the topic.
+        // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we 
set the test timeout to a triple value.
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 1);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 1);
+            if (partitioned) {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 1);
+            } else {
+                assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
+            }
+        });
+
+        // cleanup.
+        consumer.close();
+        if (partitioned) {
+            admin.topics().deletePartitionedTopic(topicName);
+        } else {
+            admin.topics().delete(topicName);
+        }
+        // Close all ServerCnx by close client-side sockets to make the config 
changes effect.
+        
pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(true);
+        reconnectAllConnections();
+    }
+
     private PulsarClient createDelayWatchTopicsClient() throws Exception {
         ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
         return InjectedClientCnxClientBuilder.create(clientBuilder,
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 250cea217ee..4c0a8d500b7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -47,13 +47,16 @@ public class TopicList {
     }
     public static List<String> filterTopics(List<String> original, Pattern 
topicsPattern) {
 
-        final Pattern shortenedTopicsPattern = 
topicsPattern.toString().contains(SCHEME_SEPARATOR)
-                ? 
Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) : 
topicsPattern;
+        final Pattern shortenedTopicsPattern = 
Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));
 
         return original.stream()
                 .map(TopicName::get)
+                .filter(topicName -> {
+                    String partitionedTopicName = 
topicName.getPartitionedTopicName();
+                    String removedScheme = 
SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
+                    return 
shortenedTopicsPattern.matcher(removedScheme).matches();
+                })
                 .map(TopicName::toString)
-                .filter(topic -> 
shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches())
                 .collect(Collectors.toList());
     }
 
@@ -78,4 +81,16 @@ public class TopicList {
         s1.removeAll(list2);
         return s1;
     }
+
+    private static String removeTopicDomainScheme(String originalRegexp) {
+        if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
+            return originalRegexp;
+        }
+        String removedTopicDomain = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
+        if (originalRegexp.contains("^")) {
+            return String.format("^%s", removedTopicDomain);
+        } else {
+            return removedTopicDomain;
+        }
+    }
 }

Reply via email to