This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc3d7329cf [improve][client] Disable polling pattern topics when 
TopicListWatcher is enabled. (#20779)
0cc3d7329cf is described below

commit 0cc3d7329cf5c77879f81b99e574972bdae90988
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Jul 17 21:46:01 2023 +0800

    [improve][client] Disable polling pattern topics when TopicListWatcher is 
enabled. (#20779)
---
 .../pulsar/client/impl/PatternTopicsConsumerImplTest.java      |  6 +++++-
 .../org/apache/pulsar/client/impl/TopicsConsumerImplTest.java  |  1 +
 .../pulsar/client/impl/PatternMultiTopicsConsumerImpl.java     | 10 ++++++----
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 572a33e871d..c8f7b721cce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -33,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 
+import io.netty.util.Timeout;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -809,7 +811,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         // 7. call recheckTopics to unsubscribe topic 1,3, verify topics 
number: 2=6-1-3
         log.debug("recheck topics change");
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
-        consumer1.run(consumer1.getRecheckPatternTimeout());
+        Timeout recheckPatternTimeout = 
spy(consumer1.getRecheckPatternTimeout());
+        doReturn(false).when(recheckPatternTimeout).isCancelled();
+        consumer1.run(recheckPatternTimeout);
         Thread.sleep(100);
         assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitions().size(), 2);
         assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 73fe9799642..51b32c2b44e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1327,6 +1327,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 
3);
 
         
consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
+        Assert.assertTrue(consumer.getRecheckPatternTimeout().isCancelled());
 
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 12c7e4e4ba3..c6ea6216cc1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -83,10 +83,12 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             long watcherId = client.newTopicListWatcherId();
             new TopicListWatcher(topicsChangeListener, client, topicsPattern, 
watcherId,
                 namespaceName, topicsHash, watcherFuture);
-            watcherFuture.exceptionally(ex -> {
-                log.debug("Unable to create topic list watcher. Falling back 
to only polling for new topics", ex);
-                return null;
-            });
+            watcherFuture
+               .thenAccept(__ -> recheckPatternTimeout.cancel())
+               .exceptionally(ex -> {
+                   log.warn("Unable to create topic list watcher. Falling back 
to only polling for new topics", ex);
+                   return null;
+               });
         } else {
             log.debug("Not creating topic list watcher for subscription mode 
{}", subscriptionMode);
             watcherFuture.complete(null);

Reply via email to