This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 569386640ab [fix] [broker] Fix can not subscribe partitioned topic
with a suffix-matched regexp (#22025)
569386640ab is described below
commit 569386640ab6205781e8afa89a57fb539292fcec
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 19 16:43:39 2024 +0800
[fix] [broker] Fix can not subscribe partitioned topic with a
suffix-matched regexp (#22025)
---
.../pulsar/broker/resources/TopicResources.java | 3 ++
.../pulsar/broker/namespace/NamespaceService.java | 3 ++
.../broker/service/PulsarCommandSenderImpl.java | 6 ++++
.../pulsar/broker/service/TopicListService.java | 22 ++++++++++++--
.../client/impl/PatternTopicsConsumerImplTest.java | 34 ++++++++++++++++++----
.../apache/pulsar/client/api/ConsumerBuilder.java | 8 ++---
.../client/impl/MultiTopicsConsumerImpl.java | 10 +++++--
.../impl/PatternMultiTopicsConsumerImpl.java | 24 +++++++++++++--
.../pulsar/client/impl/TopicListWatcher.java | 4 ++-
.../impl/conf/ConsumerConfigurationData.java | 2 +-
.../apache/pulsar/common/protocol/Commands.java | 7 +++++
11 files changed, 104 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 840ced0a1c1..0963f25c3d3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -50,6 +50,9 @@ public class TopicResources {
store.registerListener(this::handleNotification);
}
+ /***
+ * List persistent topics names under a namespace, the topic name contains
the partition suffix.
+ */
public CompletableFuture<List<String>>
listPersistentTopicsAsync(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d8c3fd169a2..b55eda150af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1432,6 +1432,9 @@ public class NamespaceService implements AutoCloseable {
});
}
+ /***
+ * List persistent topics names under a namespace, the topic name contains
the partition suffix.
+ */
public CompletableFuture<List<String>>
getListOfPersistentTopics(NamespaceName namespaceName) {
return
pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index dd74fc4e71e..105650caaaf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -356,12 +356,18 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
writeAndFlush(outBuf);
}
+ /***
+ * @param topics topic names which are matching, the topic name contains
the partition suffix.
+ */
@Override
public void sendWatchTopicListSuccess(long requestId, long watcherId,
String topicsHash, List<String> topics) {
BaseCommand command = Commands.newWatchTopicListSuccess(requestId,
watcherId, topicsHash, topics);
interceptAndWriteCommand(command);
}
+ /***
+ * {@inheritDoc}
+ */
@Override
public void sendWatchTopicListUpdate(long watcherId,
List<String> newTopics, List<String>
deletedTopics, String topicsHash) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index 7aa50057d73..aea5b9fc65b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -42,11 +43,16 @@ public class TopicListService {
public static class TopicListWatcher implements BiConsumer<String,
NotificationType> {
+ /** Topic names which are matching, the topic name contains the
partition suffix. **/
private final List<String> matchingTopics;
private final TopicListService topicListService;
private final long id;
+ /** The regexp for the topic name(not contains partition suffix). **/
private final Pattern topicsPattern;
+ /***
+ * @param topicsPattern The regexp for the topic name(not contains
partition suffix).
+ */
public TopicListWatcher(TopicListService topicListService, long id,
Pattern topicsPattern, List<String> topics) {
this.topicListService = topicListService;
@@ -59,9 +65,12 @@ public class TopicListService {
return matchingTopics;
}
+ /***
+ * @param topicName topic name which contains partition suffix.
+ */
@Override
public void accept(String topicName, NotificationType
notificationType) {
- if (topicsPattern.matcher(topicName).matches()) {
+ if
(topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches())
{
List<String> newTopics;
List<String> deletedTopics;
if (notificationType == NotificationType.Deleted) {
@@ -109,6 +118,9 @@ public class TopicListService {
}
}
+ /***
+ * @param topicsPattern The regexp for the topic name(not contains
partition suffix).
+ */
public void handleWatchTopicList(NamespaceName namespaceName, long
watcherId, long requestId, Pattern topicsPattern,
String topicsHash, Semaphore
lookupSemaphore) {
@@ -184,7 +196,9 @@ public class TopicListService {
});
}
-
+ /***
+ * @param topicsPattern The regexp for the topic name(not contains
partition suffix).
+ */
public void
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, Pattern topicsPattern) {
namespaceService.getListOfPersistentTopics(namespace).
@@ -246,6 +260,10 @@ public class TopicListService {
log.info("[{}] Closed watcher, watcherId={}",
connection.getRemoteAddress(), watcherId);
}
+ /**
+ * @param deletedTopics topic names deleted(contains the partition suffix).
+ * @param newTopics topics names added(contains the partition suffix).
+ */
public void sendTopicListUpdate(long watcherId, String topicsHash,
List<String> deletedTopics,
List<String> newTopics) {
connection.getCommandSender().sendWatchTopicListUpdate(watcherId,
newTopics, deletedTopics, topicsHash);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9bcbdfed4c9..7707abafde8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -681,16 +681,27 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
}
}
- @DataProvider(name= "partitioned")
- public Object[][] partitioned(){
+ @DataProvider(name= "regexpConsumerArgs")
+ public Object[][] regexpConsumerArgs(){
return new Object[][]{
- {true},
- {false}
+ {true, true},
+ {true, false},
+ {false, true},
+ {false, false}
};
}
- @Test(timeOut = testTimeout, dataProvider = "partitioned")
- public void testPreciseRegexpSubscribe(boolean partitioned) throws
Exception {
+ private void waitForTopicListWatcherStarted(Consumer consumer) {
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture completableFuture =
WhiteboxImpl.getInternalState(consumer, "watcherFuture");
+ log.info("isDone: {}, isCompletedExceptionally: {}",
completableFuture.isDone(),
+ completableFuture.isCompletedExceptionally());
+ assertTrue(completableFuture.isDone() &&
!completableFuture.isCompletedExceptionally());
+ });
+ }
+
+ @Test(timeOut = testTimeout, dataProvider = "regexpConsumerArgs")
+ public void testPreciseRegexpSubscribe(boolean partitioned, boolean
createTopicAfterWatcherStarted) throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscriptionName = "s1";
final Pattern pattern = Pattern.compile(String.format("%s$",
topicName));
@@ -704,6 +715,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
+ if (createTopicAfterWatcherStarted) {
+ waitForTopicListWatcherStarted(consumer);
+ }
// 1. create topic.
if (partitioned) {
@@ -733,6 +747,14 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
}
}
+ @DataProvider(name= "partitioned")
+ public Object[][] partitioned(){
+ return new Object[][]{
+ {true},
+ {true}
+ };
+ }
+
@Test(timeOut = 240 * 1000, dataProvider = "partitioned")
public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean
partitioned) throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 01f205a3afd..ddb414a72d1 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -126,7 +126,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> topics(List<String> topicNames);
/**
- * Specify a pattern for topics that this consumer subscribes to.
+ * Specify a pattern for topics(not contains the partition suffix) that
this consumer subscribes to.
*
* <p>The pattern is applied to subscribe to all topics, within a single
namespace, that match the
* pattern.
@@ -134,13 +134,13 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
- * a regular expression to select a list of topics to subscribe
to
+ * a regular expression to select a list of topics(not contains
the partition suffix) to subscribe to
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
/**
- * Specify a pattern for topics that this consumer subscribes to.
+ * Specify a pattern for topics(not contains the partition suffix) that
this consumer subscribes to.
*
* <p>It accepts a regular expression that is compiled into a pattern
internally. E.g.,
* "persistent://public/default/pattern-topic-.*"
@@ -151,7 +151,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
- * given regular expression for topics pattern
+ * given regular expression for topics(not contains the
partition suffix) pattern
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(String topicsPattern);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 84504b632ad..d18af475d61 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -929,7 +929,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
}
- // subscribe one more given topic
+ /***
+ * Subscribe one more given topic.
+ * @param topicName topic name without the partition suffix.
+ */
public CompletableFuture<Void> subscribeAsync(String topicName, boolean
createTopicIfDoesNotExist) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
@@ -1251,7 +1254,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return unsubscribeFuture;
}
- // Remove a consumer for a topic
+ /***
+ * Remove a consumer for a topic.
+ * @param topicName topic name contains the partition suffix.
+ */
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" +
topicName);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index f3ebcdee6c0..4d179f7d914 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -67,6 +67,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
private volatile Timeout recheckPatternTimeout = null;
private volatile String topicsHash;
+ /***
+ * @param topicsPattern The regexp for the topic name(not contains
partition suffix).
+ */
public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
String topicsHash,
PulsarClientImpl client,
@@ -220,14 +223,26 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
}
interface TopicsChangedListener {
- // unsubscribe and delete ConsumerImpl in the `consumers` map in
`MultiTopicsConsumerImpl` based on added topics
+ /***
+ * unsubscribe and delete {@link ConsumerImpl} in the {@link
MultiTopicsConsumerImpl#consumers} map in
+ * {@link MultiTopicsConsumerImpl}.
+ * @param removedTopics topic names removed(contains the partition
suffix).
+ */
CompletableFuture<Void> onTopicsRemoved(Collection<String>
removedTopics);
- // subscribe and create a list of new ConsumerImpl, added them to the
`consumers` map in
- // `MultiTopicsConsumerImpl`.
+
+ /***
+ * subscribe and create a list of new {@link ConsumerImpl}, added them
to the
+ * {@link MultiTopicsConsumerImpl#consumers} map in {@link
MultiTopicsConsumerImpl}.
+ * @param addedTopics topic names added(contains the partition suffix).
+ */
CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
}
private class PatternTopicsChangedListener implements
TopicsChangedListener {
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public CompletableFuture<Void> onTopicsRemoved(Collection<String>
removedTopics) {
CompletableFuture<Void> removeFuture = new CompletableFuture<>();
@@ -249,6 +264,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
return removeFuture;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public CompletableFuture<Void> onTopicsAdded(Collection<String>
addedTopics) {
CompletableFuture<Void> addFuture = new CompletableFuture<>();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 489a07a606e..86adf69f06e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -59,6 +59,9 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
private final Runnable recheckTopicsChangeAfterReconnect;
+ /***
+ * @param topicsPattern The regexp for the topic name(not contains
partition suffix).
+ */
public
TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener
topicsChangeListener,
PulsarClientImpl client, Pattern topicsPattern,
long watcherId,
NamespaceName namespace, String topicsHash,
@@ -142,7 +145,6 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
return;
}
}
-
this.connectionHandler.resetBackoff();
recheckTopicsChangeAfterReconnect.run();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 8760926792c..3ae0e977d13 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -65,7 +65,7 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
@ApiModelProperty(
name = "topicsPattern",
- value = "Topic pattern"
+ value = "The regexp for the topic name(not contains partition
suffix)."
)
private Pattern topicsPattern;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 34d47e2836b..65674af0ae1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1585,6 +1585,9 @@ public class Commands {
return cmd;
}
+ /***
+ * @param topics topic names which are matching, the topic name contains
the partition suffix.
+ */
public static BaseCommand newWatchTopicListSuccess(long requestId, long
watcherId, String topicsHash,
List<String> topics) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS);
@@ -1600,6 +1603,10 @@ public class Commands {
return cmd;
}
+ /**
+ * @param deletedTopics topic names deleted(contains the partition suffix).
+ * @param newTopics topics names added(contains the partition suffix).
+ */
public static BaseCommand newWatchTopicUpdate(long watcherId,
List<String> newTopics,
List<String> deletedTopics, String topicsHash) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE);