This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 569386640ab [fix] [broker] Fix can not subscribe partitioned topic 
with a suffix-matched regexp (#22025)
569386640ab is described below

commit 569386640ab6205781e8afa89a57fb539292fcec
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 19 16:43:39 2024 +0800

    [fix] [broker] Fix can not subscribe partitioned topic with a 
suffix-matched regexp (#22025)
---
 .../pulsar/broker/resources/TopicResources.java    |  3 ++
 .../pulsar/broker/namespace/NamespaceService.java  |  3 ++
 .../broker/service/PulsarCommandSenderImpl.java    |  6 ++++
 .../pulsar/broker/service/TopicListService.java    | 22 ++++++++++++--
 .../client/impl/PatternTopicsConsumerImplTest.java | 34 ++++++++++++++++++----
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  8 ++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 10 +++++--
 .../impl/PatternMultiTopicsConsumerImpl.java       | 24 +++++++++++++--
 .../pulsar/client/impl/TopicListWatcher.java       |  4 ++-
 .../impl/conf/ConsumerConfigurationData.java       |  2 +-
 .../apache/pulsar/common/protocol/Commands.java    |  7 +++++
 11 files changed, 104 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 840ced0a1c1..0963f25c3d3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -50,6 +50,9 @@ public class TopicResources {
         store.registerListener(this::handleNotification);
     }
 
+    /***
+     * List persistent topics names under a namespace, the topic name contains 
the partition suffix.
+     */
     public CompletableFuture<List<String>> 
listPersistentTopicsAsync(NamespaceName ns) {
         String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d8c3fd169a2..b55eda150af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1432,6 +1432,9 @@ public class NamespaceService implements AutoCloseable {
         });
     }
 
+    /***
+     * List persistent topics names under a namespace, the topic name contains 
the partition suffix.
+     */
     public CompletableFuture<List<String>> 
getListOfPersistentTopics(NamespaceName namespaceName) {
         return 
pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index dd74fc4e71e..105650caaaf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -356,12 +356,18 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         writeAndFlush(outBuf);
     }
 
+    /***
+     * @param topics topic names which are matching, the topic name contains 
the partition suffix.
+     */
     @Override
     public void sendWatchTopicListSuccess(long requestId, long watcherId, 
String topicsHash, List<String> topics) {
         BaseCommand command = Commands.newWatchTopicListSuccess(requestId, 
watcherId, topicsHash, topics);
         interceptAndWriteCommand(command);
     }
 
+    /***
+     * {@inheritDoc}
+     */
     @Override
     public void sendWatchTopicListUpdate(long watcherId,
                                          List<String> newTopics, List<String> 
deletedTopics, String topicsHash) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index 7aa50057d73..aea5b9fc65b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -42,11 +43,16 @@ public class TopicListService {
 
     public static class TopicListWatcher implements BiConsumer<String, 
NotificationType> {
 
+        /** Topic names which are matching, the topic name contains the 
partition suffix. **/
         private final List<String> matchingTopics;
         private final TopicListService topicListService;
         private final long id;
+        /** The regexp for the topic name(not contains partition suffix). **/
         private final Pattern topicsPattern;
 
+        /***
+         * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
+         */
         public TopicListWatcher(TopicListService topicListService, long id,
                                 Pattern topicsPattern, List<String> topics) {
             this.topicListService = topicListService;
@@ -59,9 +65,12 @@ public class TopicListService {
             return matchingTopics;
         }
 
+        /***
+         * @param topicName topic name which contains partition suffix.
+         */
         @Override
         public void accept(String topicName, NotificationType 
notificationType) {
-            if (topicsPattern.matcher(topicName).matches()) {
+            if 
(topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches())
 {
                 List<String> newTopics;
                 List<String> deletedTopics;
                 if (notificationType == NotificationType.Deleted) {
@@ -109,6 +118,9 @@ public class TopicListService {
         }
     }
 
+    /***
+     * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
+     */
     public void handleWatchTopicList(NamespaceName namespaceName, long 
watcherId, long requestId, Pattern topicsPattern,
                                      String topicsHash, Semaphore 
lookupSemaphore) {
 
@@ -184,7 +196,9 @@ public class TopicListService {
                 });
     }
 
-
+    /***
+     * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
+     */
     public void 
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
             NamespaceName namespace, long watcherId, Pattern topicsPattern) {
         namespaceService.getListOfPersistentTopics(namespace).
@@ -246,6 +260,10 @@ public class TopicListService {
         log.info("[{}] Closed watcher, watcherId={}", 
connection.getRemoteAddress(), watcherId);
     }
 
+    /**
+     * @param deletedTopics topic names deleted(contains the partition suffix).
+     * @param newTopics topics names added(contains the partition suffix).
+     */
     public void sendTopicListUpdate(long watcherId, String topicsHash, 
List<String> deletedTopics,
                                     List<String> newTopics) {
         connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9bcbdfed4c9..7707abafde8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -681,16 +681,27 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
-    @DataProvider(name= "partitioned")
-    public Object[][] partitioned(){
+    @DataProvider(name= "regexpConsumerArgs")
+    public Object[][] regexpConsumerArgs(){
         return new Object[][]{
-                {true},
-                {false}
+                {true, true},
+                {true, false},
+                {false, true},
+                {false, false}
         };
     }
 
-    @Test(timeOut = testTimeout, dataProvider = "partitioned")
-    public void testPreciseRegexpSubscribe(boolean partitioned) throws 
Exception {
+    private void waitForTopicListWatcherStarted(Consumer consumer) {
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture completableFuture = 
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+            log.info("isDone: {}, isCompletedExceptionally: {}", 
completableFuture.isDone(),
+                    completableFuture.isCompletedExceptionally());
+            assertTrue(completableFuture.isDone() && 
!completableFuture.isCompletedExceptionally());
+        });
+    }
+
+    @Test(timeOut = testTimeout, dataProvider = "regexpConsumerArgs")
+    public void testPreciseRegexpSubscribe(boolean partitioned, boolean 
createTopicAfterWatcherStarted) throws Exception {
         final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         final String subscriptionName = "s1";
         final Pattern pattern = Pattern.compile(String.format("%s$", 
topicName));
@@ -704,6 +715,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
                 .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
                 .receiverQueueSize(4)
                 .subscribe();
+        if (createTopicAfterWatcherStarted) {
+            waitForTopicListWatcherStarted(consumer);
+        }
 
         // 1. create topic.
         if (partitioned) {
@@ -733,6 +747,14 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
+    @DataProvider(name= "partitioned")
+    public Object[][] partitioned(){
+        return new Object[][]{
+                {true},
+                {true}
+        };
+    }
+
     @Test(timeOut = 240 * 1000, dataProvider = "partitioned")
     public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean 
partitioned) throws Exception {
         final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 01f205a3afd..ddb414a72d1 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -126,7 +126,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> topics(List<String> topicNames);
 
     /**
-     * Specify a pattern for topics that this consumer subscribes to.
+     * Specify a pattern for topics(not contains the partition suffix) that 
this consumer subscribes to.
      *
      * <p>The pattern is applied to subscribe to all topics, within a single 
namespace, that match the
      * pattern.
@@ -134,13 +134,13 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * <p>The consumer automatically subscribes to topics created after itself.
      *
      * @param topicsPattern
-     *            a regular expression to select a list of topics to subscribe 
to
+     *            a regular expression to select a list of topics(not contains 
the partition suffix) to subscribe to
      * @return the consumer builder instance
      */
     ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
 
     /**
-     * Specify a pattern for topics that this consumer subscribes to.
+     * Specify a pattern for topics(not contains the partition suffix) that 
this consumer subscribes to.
      *
      * <p>It accepts a regular expression that is compiled into a pattern 
internally. E.g.,
      * "persistent://public/default/pattern-topic-.*"
@@ -151,7 +151,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * <p>The consumer automatically subscribes to topics created after itself.
      *
      * @param topicsPattern
-     *            given regular expression for topics pattern
+     *            given regular expression for topics(not contains the 
partition suffix) pattern
      * @return the consumer builder instance
      */
     ConsumerBuilder<T> topicsPattern(String topicsPattern);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 84504b632ad..d18af475d61 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -929,7 +929,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
     }
 
-    // subscribe one more given topic
+    /***
+     * Subscribe one more given topic.
+     * @param topicName topic name without the partition suffix.
+     */
     public CompletableFuture<Void> subscribeAsync(String topicName, boolean 
createTopicIfDoesNotExist) {
         TopicName topicNameInstance = getTopicName(topicName);
         if (topicNameInstance == null) {
@@ -1251,7 +1254,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return unsubscribeFuture;
     }
 
-    // Remove a consumer for a topic
+    /***
+     * Remove a consumer for a topic.
+     * @param topicName topic name contains the partition suffix.
+     */
     public CompletableFuture<Void> removeConsumerAsync(String topicName) {
         checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + 
topicName);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index f3ebcdee6c0..4d179f7d914 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -67,6 +67,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     private volatile Timeout recheckPatternTimeout = null;
     private volatile String topicsHash;
 
+    /***
+     * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
+     */
     public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
                                           String topicsHash,
                                           PulsarClientImpl client,
@@ -220,14 +223,26 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     }
 
     interface TopicsChangedListener {
-        // unsubscribe and delete ConsumerImpl in the `consumers` map in 
`MultiTopicsConsumerImpl` based on added topics
+        /***
+         * unsubscribe and delete {@link ConsumerImpl} in the {@link 
MultiTopicsConsumerImpl#consumers} map in
+         * {@link MultiTopicsConsumerImpl}.
+         * @param removedTopics topic names removed(contains the partition 
suffix).
+         */
         CompletableFuture<Void> onTopicsRemoved(Collection<String> 
removedTopics);
-        // subscribe and create a list of new ConsumerImpl, added them to the 
`consumers` map in
-        // `MultiTopicsConsumerImpl`.
+
+        /***
+         * subscribe and create a list of new {@link ConsumerImpl}, added them 
to the
+         * {@link MultiTopicsConsumerImpl#consumers} map in {@link 
MultiTopicsConsumerImpl}.
+         * @param addedTopics topic names added(contains the partition suffix).
+         */
         CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
     }
 
     private class PatternTopicsChangedListener implements 
TopicsChangedListener {
+
+        /**
+         * {@inheritDoc}
+         */
         @Override
         public CompletableFuture<Void> onTopicsRemoved(Collection<String> 
removedTopics) {
             CompletableFuture<Void> removeFuture = new CompletableFuture<>();
@@ -249,6 +264,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             return removeFuture;
         }
 
+        /**
+         * {@inheritDoc}
+         */
         @Override
         public CompletableFuture<Void> onTopicsAdded(Collection<String> 
addedTopics) {
             CompletableFuture<Void> addFuture = new CompletableFuture<>();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 489a07a606e..86adf69f06e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -59,6 +59,9 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
     private final Runnable recheckTopicsChangeAfterReconnect;
 
 
+    /***
+     * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
+     */
     public 
TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener,
                             PulsarClientImpl client, Pattern topicsPattern, 
long watcherId,
                             NamespaceName namespace, String topicsHash,
@@ -142,7 +145,6 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
                                 return;
                             }
                         }
-
                         this.connectionHandler.resetBackoff();
 
                         recheckTopicsChangeAfterReconnect.run();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 8760926792c..3ae0e977d13 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -65,7 +65,7 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     @ApiModelProperty(
             name = "topicsPattern",
-            value = "Topic pattern"
+            value = "The regexp for the topic name(not contains partition 
suffix)."
     )
     private Pattern topicsPattern;
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 34d47e2836b..65674af0ae1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1585,6 +1585,9 @@ public class Commands {
         return cmd;
     }
 
+    /***
+     * @param topics topic names which are matching, the topic name contains 
the partition suffix.
+     */
     public static BaseCommand newWatchTopicListSuccess(long requestId, long 
watcherId, String topicsHash,
                                                        List<String> topics) {
         BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS);
@@ -1600,6 +1603,10 @@ public class Commands {
         return cmd;
     }
 
+    /**
+     * @param deletedTopics topic names deleted(contains the partition suffix).
+     * @param newTopics topics names added(contains the partition suffix).
+     */
     public static BaseCommand newWatchTopicUpdate(long watcherId,
                                               List<String> newTopics, 
List<String> deletedTopics, String topicsHash) {
         BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE);

Reply via email to