This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new cf341857aad [branch-2.9][improve][client] Avoid timer task run before 
previous subscribe complete. (#15747)
cf341857aad is described below

commit cf341857aad0e3b7534b6ae0901398cfd8efd062
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed May 25 09:43:52 2022 +0800

    [branch-2.9][improve][client] Avoid timer task run before previous 
subscribe complete. (#15747)
---
 .../impl/PatternMultiTopicsConsumerImpl.java       | 61 ++++++++++------------
 1 file changed, 29 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 2f946af712b..114cb274bc3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -24,14 +24,15 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -81,40 +82,36 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             return;
         }
 
-        CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
-        List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(2);
-
-        client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode).thenAccept(topics -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Get topics under namespace {}, topics.size: {}", 
namespaceName.toString(), topics.size());
-                topics.forEach(topicName ->
-                    log.debug("Get topics under namespace {}, topic: {}", 
namespaceName.toString(), topicName));
-            }
-
-            List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
-            List<String> oldTopics = Lists.newArrayList();
-            oldTopics.addAll(getPartitionedTopics());
-            getPartitions().forEach(p -> {
-                TopicName t = TopicName.get(p);
-                if (!t.isPartitioned() || 
!oldTopics.contains(t.getPartitionedTopicName())) {
-                    oldTopics.add(p);
-                }
-            });
-
-            
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, 
oldTopics)));
-            
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, 
newTopics)));
-            FutureUtil.waitForAll(futures)
-                .thenAccept(finalFuture -> recheckFuture.complete(null))
-                .exceptionally(ex -> {
+        client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode)
+                .thenCompose(topics -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Get topics under namespace {}, topics.size: 
{}",
+                                namespaceName.toString(), topics.size());
+                        topics.forEach(topicName ->
+                                log.debug("Get topics under namespace {}, 
topic: {}",
+                                        namespaceName.toString(), topicName));
+                    }
+                    final List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
+                    final List<String> oldTopics = new 
ArrayList<>(getPartitionedTopics());
+                    for (String partition : getPartitions()) {
+                        TopicName topicName = TopicName.get(partition);
+                        if (!topicName.isPartitioned() || 
!oldTopics.contains(topicName.getPartitionedTopicName())) {
+                            oldTopics.add(partition);
+                        }
+                    }
+                    final List<CompletableFuture<?>> listenersCallback = new 
ArrayList<>(2);
+                    
listenersCallback.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
 oldTopics)));
+                    
listenersCallback.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
 newTopics)));
+                    return 
FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
+                }).exceptionally(ex -> {
                     log.warn("[{}] Failed to recheck topics change: {}", 
topic, ex.getMessage());
-                    recheckFuture.completeExceptionally(ex);
                     return null;
+                }).thenAccept(__ -> {
+                    // schedule the next re-check task
+                    this.recheckPatternTimeout = client.timer()
+                            .newTimeout(PatternMultiTopicsConsumerImpl.this,
+                                    Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
                 });
-        });
-
-        // schedule the next re-check task
-        this.recheckPatternTimeout = 
client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
-                Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
     }
 
     public Pattern getPattern() {

Reply via email to