This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc3d7329cf [improve][client] Disable polling pattern topics when
TopicListWatcher is enabled. (#20779)
0cc3d7329cf is described below
commit 0cc3d7329cf5c77879f81b99e574972bdae90988
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Jul 17 21:46:01 2023 +0800
[improve][client] Disable polling pattern topics when TopicListWatcher is
enabled. (#20779)
---
.../pulsar/client/impl/PatternTopicsConsumerImplTest.java | 6 +++++-
.../org/apache/pulsar/client/impl/TopicsConsumerImplTest.java | 1 +
.../pulsar/client/impl/PatternMultiTopicsConsumerImpl.java | 10 ++++++----
3 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 572a33e871d..c8f7b721cce 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -33,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
+import io.netty.util.Timeout;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -809,7 +811,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 7. call recheckTopics to unsubscribe topic 1,3, verify topics
number: 2=6-1-3
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
- consumer1.run(consumer1.getRecheckPatternTimeout());
+ Timeout recheckPatternTimeout =
spy(consumer1.getRecheckPatternTimeout());
+ doReturn(false).when(recheckPatternTimeout).isCancelled();
+ consumer1.run(recheckPatternTimeout);
Thread.sleep(100);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getPartitions().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getConsumers().size(), 2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 73fe9799642..51b32c2b44e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1327,6 +1327,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions,
3);
consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
+ Assert.assertTrue(consumer.getRecheckPatternTimeout().isCancelled());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 12c7e4e4ba3..c6ea6216cc1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -83,10 +83,12 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
long watcherId = client.newTopicListWatcherId();
new TopicListWatcher(topicsChangeListener, client, topicsPattern,
watcherId,
namespaceName, topicsHash, watcherFuture);
- watcherFuture.exceptionally(ex -> {
- log.debug("Unable to create topic list watcher. Falling back
to only polling for new topics", ex);
- return null;
- });
+ watcherFuture
+ .thenAccept(__ -> recheckPatternTimeout.cancel())
+ .exceptionally(ex -> {
+ log.warn("Unable to create topic list watcher. Falling back
to only polling for new topics", ex);
+ return null;
+ });
} else {
log.debug("Not creating topic list watcher for subscription mode
{}", subscriptionMode);
watcherFuture.complete(null);