This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new cf341857aad [branch-2.9][improve][client] Avoid timer task run before
previous subscribe complete. (#15747)
cf341857aad is described below
commit cf341857aad0e3b7534b6ae0901398cfd8efd062
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed May 25 09:43:52 2022 +0800
[branch-2.9][improve][client] Avoid timer task run before previous
subscribe complete. (#15747)
---
.../impl/PatternMultiTopicsConsumerImpl.java | 61 ++++++++++------------
1 file changed, 29 insertions(+), 32 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 2f946af712b..114cb274bc3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -24,14 +24,15 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -81,40 +82,36 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
return;
}
- CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
- List<CompletableFuture<Void>> futures =
Lists.newArrayListWithExpectedSize(2);
-
- client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode).thenAccept(topics -> {
- if (log.isDebugEnabled()) {
- log.debug("Get topics under namespace {}, topics.size: {}",
namespaceName.toString(), topics.size());
- topics.forEach(topicName ->
- log.debug("Get topics under namespace {}, topic: {}",
namespaceName.toString(), topicName));
- }
-
- List<String> newTopics =
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
- List<String> oldTopics = Lists.newArrayList();
- oldTopics.addAll(getPartitionedTopics());
- getPartitions().forEach(p -> {
- TopicName t = TopicName.get(p);
- if (!t.isPartitioned() ||
!oldTopics.contains(t.getPartitionedTopicName())) {
- oldTopics.add(p);
- }
- });
-
-
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
oldTopics)));
-
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
newTopics)));
- FutureUtil.waitForAll(futures)
- .thenAccept(finalFuture -> recheckFuture.complete(null))
- .exceptionally(ex -> {
+ client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode)
+ .thenCompose(topics -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Get topics under namespace {}, topics.size:
{}",
+ namespaceName.toString(), topics.size());
+ topics.forEach(topicName ->
+ log.debug("Get topics under namespace {},
topic: {}",
+ namespaceName.toString(), topicName));
+ }
+ final List<String> newTopics =
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
+ final List<String> oldTopics = new
ArrayList<>(getPartitionedTopics());
+ for (String partition : getPartitions()) {
+ TopicName topicName = TopicName.get(partition);
+ if (!topicName.isPartitioned() ||
!oldTopics.contains(topicName.getPartitionedTopicName())) {
+ oldTopics.add(partition);
+ }
+ }
+ final List<CompletableFuture<?>> listenersCallback = new
ArrayList<>(2);
+
listenersCallback.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
oldTopics)));
+
listenersCallback.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
newTopics)));
+ return
FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
+ }).exceptionally(ex -> {
log.warn("[{}] Failed to recheck topics change: {}",
topic, ex.getMessage());
- recheckFuture.completeExceptionally(ex);
return null;
+ }).thenAccept(__ -> {
+ // schedule the next re-check task
+ this.recheckPatternTimeout = client.timer()
+ .newTimeout(PatternMultiTopicsConsumerImpl.this,
+ Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
});
- });
-
- // schedule the next re-check task
- this.recheckPatternTimeout =
client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
- Math.max(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.SECONDS);
}
public Pattern getPattern() {