This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 734f79f1baf [fix][client] Pattern subscription regression when broker-side evaluation is disabled (#24104) 734f79f1baf is described below commit 734f79f1baf5c8858d3702ad5dd5c3030030ba00 Author: Baodi Shi <ba...@apache.org> AuthorDate: Mon Mar 24 15:07:27 2025 +0800 [fix][client] Pattern subscription regression when broker-side evaluation is disabled (#24104) (cherry picked from commit d7962a100ad8bfaa5444aaeff465f8c0cc742d32) --- .../client/impl/PatternTopicsConsumerImplTest.java | 73 +++++++++++++++++++--- .../pulsar/client/impl/TopicsConsumerImplTest.java | 3 - .../impl/PatternMultiTopicsConsumerImpl.java | 37 +++++------ 3 files changed, 83 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 24c8b67a423..6f724e5d3bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -589,12 +590,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); - // 5. call recheckTopics to subscribe each added topics above - log.debug("recheck topics change"); - PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); - consumer1.run(consumer1.getRecheckPatternTimeout()); - - // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. + // 5. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. Awaitility.await().untilAsserted(() -> { assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6); @@ -699,6 +695,9 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 0. Need make sure topic watcher started waitForTopicListWatcherStarted(consumer); + // if broker enable watch topic, then recheckPatternTimeout will be null. + assertNull(((PatternMultiTopicsConsumerImpl<?>) consumer).getRecheckPatternTimeout()); + // 1. create partition topic String topicName = "persistent://my-property/my-ns/test-pattern" + key; admin.topics().createPartitionedTopic(topicName, 4); @@ -717,6 +716,67 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { pulsarClient.close(); } + @DataProvider(name= "topicDomain") + public Object[][] topicDomain(){ + return new Object[][]{ + {"persistent"}, + {"non-persistent"}, + }; + } + + @Test(timeOut = testTimeout, dataProvider = "topicDomain") + public void testSubscribePatterBrokerDisable(String topicDomain) throws Exception { + if(topicDomain.equals("persistent")) { + conf.setEnableBrokerSideSubscriptionPatternEvaluation(false); + } + final String key = "testSubscribePatterWithOutTopicDomain"; + final String subscriptionName = "my-ex-subscription-" + key; + final Pattern pattern = Pattern.compile("my-property/my-ns/test-pattern.*"); + + // 0. Create topic1 with 4 partition + String topicName1 = topicDomain + "://my-property/my-ns/test-pattern-0" + key; + admin.topics().createPartitionedTopic(topicName1, 4); + Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1).create(); + + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionTopicsMode(topicDomain.equals("persistent") ? RegexSubscriptionMode.PersistentOnly + : RegexSubscriptionMode.NonPersistentOnly) + .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS) + .receiverQueueSize(4) + .subscribe(); + + // 1. Need make sure first check complete + Awaitility.await().untilAsserted(() -> { + int recheckEpoch = ((PatternMultiTopicsConsumerImpl<?>) consumer).getRecheckPatternEpoch(); + assertTrue(recheckEpoch > 0); + }); + + // 2. Create topic2 with 4 partition + String topicName2 = topicDomain + "://my-property/my-ns/test-pattern" + key; + admin.topics().createPartitionedTopic(topicName2, 4); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create(); + + // 3. verify will update the partitions and consumers + assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern()); + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 8); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 8); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2); + }); + + // cleanup. + consumer.close(); + producer1.close(); + producer2.close(); + admin.topics().deletePartitionedTopic(topicName1); + admin.topics().deletePartitionedTopic(topicName2); + pulsarClient.close(); + conf.setEnableBrokerSideSubscriptionPatternEvaluation(true); + } + @DataProvider(name= "regexpConsumerArgs") public Object[][] regexpConsumerArgs(){ return new Object[][]{ @@ -952,7 +1012,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 log.debug("recheck topics change"); PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); - consumer1.run(consumer1.getRecheckPatternTimeout()); Awaitility.await().untilAsserted(() -> { assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 10); assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 3c7cd16f144..18e278c7ce3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -1350,9 +1350,6 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.topics().createPartitionedTopic(topicName1, 3); assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3); - consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout()); - Assert.assertTrue(consumer.getRecheckPatternTimeout().isCancelled()); - Awaitility.await().untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8); Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 91c6da26d59..7268135e8d8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -44,8 +44,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.topics.TopicList; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,16 +55,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>(); protected NamespaceName namespaceName; - /** - * There is two task to re-check topic changes, the both tasks will not be take affects at the same time. - * 1. {@link #recheckTopicsChangeAfterReconnect}: it will be called after the {@link TopicListWatcher} reconnected - * if you enabled {@link TopicListWatcher}. This backoff used to do a retry if - * {@link #recheckTopicsChangeAfterReconnect} is failed. - * 2. {@link #run} A scheduled task to trigger re-check topic changes, it will be used if you disabled - * {@link TopicListWatcher}. - */ - private final Backoff recheckPatternTaskBackoff; private final AtomicInteger recheckPatternEpoch = new AtomicInteger(); + // If recheckPatternTimeout is not null, it means the broker's topic watcher is disabled. + // The client need falls back to the polling model. private volatile Timeout recheckPatternTimeout = null; private volatile String topicsHash; @@ -89,11 +80,6 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T this.topicsPattern = topicsPattern; this.topicsHash = topicsHash; this.subscriptionMode = subscriptionMode; - this.recheckPatternTaskBackoff = new BackoffBuilder() - .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS) - .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) - .setMandatoryStop(0, TimeUnit.SECONDS) - .create(); if (this.namespaceName == null) { this.namespaceName = getNameSpaceFromPattern(topicsPattern); @@ -102,23 +88,24 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T this.topicsChangeListener = new PatternTopicsChangedListener(); this.updateTaskQueue = new PatternConsumerUpdateQueue(this); - this.recheckPatternTimeout = client.timer() - .newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); if (subscriptionMode == Mode.PERSISTENT) { long watcherId = client.newTopicListWatcherId(); new TopicListWatcher(updateTaskQueue, client, topicsPattern, watcherId, namespaceName, topicsHash, watcherFuture, () -> recheckTopicsChangeAfterReconnect()); watcherFuture - .thenAccept(__ -> recheckPatternTimeout.cancel()) .exceptionally(ex -> { log.warn("Pattern consumer [{}] unable to create topic list watcher. Falling back to only polling" + " for new topics", conf.getSubscriptionName(), ex); + this.recheckPatternTimeout = client.timer().newTimeout( + this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); return null; }); } else { log.debug("Pattern consumer [{}] not creating topic list watcher for subscription mode {}", conf.getSubscriptionName(), subscriptionMode); watcherFuture.complete(null); + this.recheckPatternTimeout = client.timer().newTimeout( + this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); } } @@ -155,7 +142,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T // If "recheckTopicsChange" has been called more than one times, only make the last one take affects. // Use "synchronized (recheckPatternTaskBackoff)" instead of // `synchronized(PatternMultiTopicsConsumerImpl.this)` to avoid locking in a wider range. - synchronized (recheckPatternTaskBackoff) { + synchronized (PatternMultiTopicsConsumerImpl.this) { if (recheckPatternEpoch.get() > epoch) { return CompletableFuture.completedFuture(null); } @@ -173,6 +160,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult, topicsChangeListener, oldTopics, subscription); } + }).thenAccept(__ -> { + if (recheckPatternTimeout != null) { + this.recheckPatternTimeout = client.timer().newTimeout( + this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); + } }); } @@ -414,6 +406,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T return FutureUtil.waitForAll(closeFutures); } + @VisibleForTesting + int getRecheckPatternEpoch() { + return recheckPatternEpoch.get(); + } + @VisibleForTesting Timeout getRecheckPatternTimeout() { return recheckPatternTimeout;