This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 734f79f1baf [fix][client] Pattern subscription regression when 
broker-side evaluation is disabled (#24104)
734f79f1baf is described below

commit 734f79f1baf5c8858d3702ad5dd5c3030030ba00
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Mon Mar 24 15:07:27 2025 +0800

    [fix][client] Pattern subscription regression when broker-side evaluation 
is disabled (#24104)
    
    (cherry picked from commit d7962a100ad8bfaa5444aaeff465f8c0cc742d32)
---
 .../client/impl/PatternTopicsConsumerImplTest.java | 73 +++++++++++++++++++---
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  3 -
 .../impl/PatternMultiTopicsConsumerImpl.java       | 37 +++++------
 3 files changed, 83 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 24c8b67a423..6f724e5d3bd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -589,12 +590,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
             .create();
 
-        // 5. call recheckTopics to subscribe each added topics above
-        log.debug("recheck topics change");
-        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
-        consumer1.run(consumer1.getRecheckPatternTimeout());
-
-        // 6. verify consumer get methods, to get number of partitions and 
topics, value 6=1+2+3.
+        // 5. verify consumer get methods, to get number of partitions and 
topics, value 6=1+2+3.
         Awaitility.await().untilAsserted(() -> {
             assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
@@ -699,6 +695,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         // 0. Need make sure topic watcher started
         waitForTopicListWatcherStarted(consumer);
 
+        // if broker enable watch topic, then recheckPatternTimeout will be 
null.
+        assertNull(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getRecheckPatternTimeout());
+
         // 1. create partition topic
         String topicName = "persistent://my-property/my-ns/test-pattern" + key;
         admin.topics().createPartitionedTopic(topicName, 4);
@@ -717,6 +716,67 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         pulsarClient.close();
     }
 
+    @DataProvider(name= "topicDomain")
+    public Object[][] topicDomain(){
+        return new Object[][]{
+                {"persistent"},
+                {"non-persistent"},
+        };
+    }
+
+    @Test(timeOut = testTimeout, dataProvider = "topicDomain")
+    public void testSubscribePatterBrokerDisable(String topicDomain) throws 
Exception {
+        if(topicDomain.equals("persistent")) {
+            conf.setEnableBrokerSideSubscriptionPatternEvaluation(false);
+        }
+        final String key = "testSubscribePatterWithOutTopicDomain";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final Pattern pattern = 
Pattern.compile("my-property/my-ns/test-pattern.*");
+
+        // 0. Create topic1 with 4 partition
+        String topicName1 = topicDomain + 
"://my-property/my-ns/test-pattern-0" + key;
+        admin.topics().createPartitionedTopic(topicName1, 4);
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1).create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionTopicsMode(topicDomain.equals("persistent") ? 
RegexSubscriptionMode.PersistentOnly 
+                        : RegexSubscriptionMode.NonPersistentOnly)
+                .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
+                .receiverQueueSize(4)
+                .subscribe();
+
+        // 1. Need make sure first check complete
+        Awaitility.await().untilAsserted(() -> {
+            int recheckEpoch = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getRecheckPatternEpoch();
+            assertTrue(recheckEpoch > 0);
+        });
+
+        // 2. Create topic2 with 4 partition
+        String topicName2 = topicDomain + "://my-property/my-ns/test-pattern" 
+ key;
+        admin.topics().createPartitionedTopic(topicName2, 4);
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2).create();
+
+        // 3. verify will update the partitions and consumers
+        assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern().pattern());
+        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 8);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 8);
+            assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
+        });
+
+        // cleanup.
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        admin.topics().deletePartitionedTopic(topicName1);
+        admin.topics().deletePartitionedTopic(topicName2);
+        pulsarClient.close();
+        conf.setEnableBrokerSideSubscriptionPatternEvaluation(true);
+    }
+
     @DataProvider(name= "regexpConsumerArgs")
     public Object[][] regexpConsumerArgs(){
         return new Object[][]{
@@ -952,7 +1012,6 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         // 7. call recheckTopics to subscribe each added topics above, verify 
topics number: 10=1+2+3+4
         log.debug("recheck topics change");
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
-        consumer1.run(consumer1.getRecheckPatternTimeout());
         Awaitility.await().untilAsserted(() -> {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 10);
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 10);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 3c7cd16f144..18e278c7ce3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1350,9 +1350,6 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         admin.topics().createPartitionedTopic(topicName1, 3);
         
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 
3);
 
-        
consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
-        Assert.assertTrue(consumer.getRecheckPatternTimeout().isCancelled());
-
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
             Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 
8);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 91c6da26d59..7268135e8d8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -44,8 +44,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.topics.TopicList;
-import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,16 +55,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     private final CompletableFuture<TopicListWatcher> watcherFuture = new 
CompletableFuture<>();
     protected NamespaceName namespaceName;
 
-    /**
-     * There is two task to re-check topic changes, the both tasks will not be 
take affects at the same time.
-     * 1. {@link #recheckTopicsChangeAfterReconnect}: it will be called after 
the {@link TopicListWatcher} reconnected
-     *     if you enabled {@link TopicListWatcher}. This backoff used to do a 
retry if
-     *     {@link #recheckTopicsChangeAfterReconnect} is failed.
-     * 2. {@link #run} A scheduled task to trigger re-check topic changes, it 
will be used if you disabled
-     *     {@link TopicListWatcher}.
-     */
-    private final Backoff recheckPatternTaskBackoff;
     private final AtomicInteger recheckPatternEpoch = new AtomicInteger();
+    // If recheckPatternTimeout is not null, it means the broker's topic 
watcher is disabled.
+    // The client need falls back to the polling model.
     private volatile Timeout recheckPatternTimeout = null;
     private volatile String topicsHash;
 
@@ -89,11 +80,6 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         this.topicsPattern = topicsPattern;
         this.topicsHash = topicsHash;
         this.subscriptionMode = subscriptionMode;
-        this.recheckPatternTaskBackoff = new BackoffBuilder()
-                
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                .setMandatoryStop(0, TimeUnit.SECONDS)
-                .create();
 
         if (this.namespaceName == null) {
             this.namespaceName = getNameSpaceFromPattern(topicsPattern);
@@ -102,23 +88,24 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
 
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
-        this.recheckPatternTimeout = client.timer()
-                .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
         if (subscriptionMode == Mode.PERSISTENT) {
             long watcherId = client.newTopicListWatcherId();
             new TopicListWatcher(updateTaskQueue, client, topicsPattern, 
watcherId,
                 namespaceName, topicsHash, watcherFuture, () -> 
recheckTopicsChangeAfterReconnect());
             watcherFuture
-               .thenAccept(__ -> recheckPatternTimeout.cancel())
                .exceptionally(ex -> {
                    log.warn("Pattern consumer [{}] unable to create topic list 
watcher. Falling back to only polling"
                            + " for new topics", conf.getSubscriptionName(), 
ex);
+                   this.recheckPatternTimeout = client.timer().newTimeout(
+                           this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
                    return null;
                });
         } else {
             log.debug("Pattern consumer [{}] not creating topic list watcher 
for subscription mode {}",
                     conf.getSubscriptionName(), subscriptionMode);
             watcherFuture.complete(null);
+            this.recheckPatternTimeout = client.timer().newTimeout(
+                    this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
         }
     }
 
@@ -155,7 +142,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                 // If "recheckTopicsChange" has been called more than one 
times, only make the last one take affects.
                 // Use "synchronized (recheckPatternTaskBackoff)" instead of
                 // `synchronized(PatternMultiTopicsConsumerImpl.this)` to 
avoid locking in a wider range.
-                synchronized (recheckPatternTaskBackoff) {
+                synchronized (PatternMultiTopicsConsumerImpl.this) {
                     if (recheckPatternEpoch.get() > epoch) {
                         return CompletableFuture.completedFuture(null);
                     }
@@ -173,6 +160,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                     return updateSubscriptions(topicsPattern, 
this::setTopicsHash, getTopicsResult,
                             topicsChangeListener, oldTopics, subscription);
                 }
+            }).thenAccept(__ -> {
+                if (recheckPatternTimeout != null) {
+                    this.recheckPatternTimeout = client.timer().newTimeout(
+                            this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+                }
             });
     }
 
@@ -414,6 +406,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         return FutureUtil.waitForAll(closeFutures);
     }
 
+    @VisibleForTesting
+    int getRecheckPatternEpoch() {
+        return recheckPatternEpoch.get();
+    }
+
     @VisibleForTesting
     Timeout getRecheckPatternTimeout() {
         return recheckPatternTimeout;

Reply via email to