This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fa227a066723ea707bc676d9193b5cebde43c15b Author: Qiang Zhao <[email protected]> AuthorDate: Thu Mar 24 10:50:19 2022 +0800 [improve][client] Avoid timertask run before previous subscribe complete. (#14818) If configuration ``patternAutoDiscoveryPeriod`` is small, there may be some unnecessary subscribe requests. *Describe the modifications you've done.* (cherry picked from commit 0fe921f32cefe7648ca428cd9861f9163c69767d) --- .../impl/PatternMultiTopicsConsumerImpl.java | 63 +++++++++++----------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 2f946af712b..081e4a391c2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -80,41 +82,36 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T if (timeout.isCancelled()) { return; } - - CompletableFuture<Void> recheckFuture = new CompletableFuture<>(); - List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(2); - - client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> { - if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); - topics.forEach(topicName -> - log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName)); - } - - List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); - List<String> oldTopics = Lists.newArrayList(); - oldTopics.addAll(getPartitionedTopics()); - getPartitions().forEach(p -> { - TopicName t = TopicName.get(p); - if (!t.isPartitioned() || !oldTopics.contains(t.getPartitionedTopicName())) { - oldTopics.add(p); + client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode) + .thenCompose(topics -> { + if (log.isDebugEnabled()) { + log.debug("Get topics under namespace {}, topics.size: {}", + namespaceName.toString(), topics.size()); + topics.forEach(topicName -> + log.debug("Get topics under namespace {}, topic: {}", + namespaceName.toString(), topicName)); + } + final List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); + final List<String> oldTopics = new ArrayList<>(getPartitionedTopics()); + for (String partition : getPartitions()) { + TopicName topicName = TopicName.get(partition); + if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) { + oldTopics.add(partition); + } } + final List<CompletableFuture<?>> listenersCallback = new ArrayList<>(2); + listenersCallback.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); + listenersCallback.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); + return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback)); + }).exceptionally(ex -> { + log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); + return null; + }).thenAccept(__ -> { + // schedule the next re-check task + this.recheckPatternTimeout = client.timer() + .newTimeout(PatternMultiTopicsConsumerImpl.this, + Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); }); - - futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); - futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); - FutureUtil.waitForAll(futures) - .thenAccept(finalFuture -> recheckFuture.complete(null)) - .exceptionally(ex -> { - log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); - recheckFuture.completeExceptionally(ex); - return null; - }); - }); - - // schedule the next re-check task - this.recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this, - Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); } public Pattern getPattern() {
