This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa227a066723ea707bc676d9193b5cebde43c15b
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Mar 24 10:50:19 2022 +0800

    [improve][client] Avoid timertask run before previous subscribe complete. 
(#14818)
    
    If configuration  ``patternAutoDiscoveryPeriod`` is small, there may be 
some unnecessary subscribe requests.
    
    *Describe the modifications you've done.*
    
    (cherry picked from commit 0fe921f32cefe7648ca428cd9861f9163c69767d)
---
 .../impl/PatternMultiTopicsConsumerImpl.java       | 63 +++++++++++-----------
 1 file changed, 30 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 2f946af712b..081e4a391c2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -80,41 +82,36 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         if (timeout.isCancelled()) {
             return;
         }
-
-        CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
-        List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(2);
-
-        client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode).thenAccept(topics -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Get topics under namespace {}, topics.size: {}", 
namespaceName.toString(), topics.size());
-                topics.forEach(topicName ->
-                    log.debug("Get topics under namespace {}, topic: {}", 
namespaceName.toString(), topicName));
-            }
-
-            List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
-            List<String> oldTopics = Lists.newArrayList();
-            oldTopics.addAll(getPartitionedTopics());
-            getPartitions().forEach(p -> {
-                TopicName t = TopicName.get(p);
-                if (!t.isPartitioned() || 
!oldTopics.contains(t.getPartitionedTopicName())) {
-                    oldTopics.add(p);
+        client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode)
+            .thenCompose(topics -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Get topics under namespace {}, topics.size: {}",
+                            namespaceName.toString(), topics.size());
+                    topics.forEach(topicName ->
+                            log.debug("Get topics under namespace {}, topic: 
{}",
+                                    namespaceName.toString(), topicName));
+                }
+                final List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
+                final List<String> oldTopics = new 
ArrayList<>(getPartitionedTopics());
+                for (String partition : getPartitions()) {
+                    TopicName topicName = TopicName.get(partition);
+                    if (!topicName.isPartitioned() || 
!oldTopics.contains(topicName.getPartitionedTopicName())) {
+                        oldTopics.add(partition);
+                    }
                 }
+                final List<CompletableFuture<?>> listenersCallback = new 
ArrayList<>(2);
+                
listenersCallback.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
 oldTopics)));
+                
listenersCallback.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
 newTopics)));
+                return 
FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to recheck topics change: {}", topic, 
ex.getMessage());
+                return null;
+            }).thenAccept(__ -> {
+                // schedule the next re-check task
+                this.recheckPatternTimeout = client.timer()
+                        .newTimeout(PatternMultiTopicsConsumerImpl.this,
+                        Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
             });
-
-            
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, 
oldTopics)));
-            
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, 
newTopics)));
-            FutureUtil.waitForAll(futures)
-                .thenAccept(finalFuture -> recheckFuture.complete(null))
-                .exceptionally(ex -> {
-                    log.warn("[{}] Failed to recheck topics change: {}", 
topic, ex.getMessage());
-                    recheckFuture.completeExceptionally(ex);
-                    return null;
-                });
-        });
-
-        // schedule the next re-check task
-        this.recheckPatternTimeout = 
client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
-                Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
     }
 
     public Pattern getPattern() {

Reply via email to