This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1f17b1a08482cda96e944a0331277c7429a276bd Author: lipenghui <[email protected]> AuthorDate: Fri Feb 5 14:46:47 2021 +0800 Fix the partition number not equals expected error (#9446) Fixes #8000 ### Motivation Fix the partition number not equals expected error ### Verifying this change New tests added, without this fix, you can see errors like `topics consumer java.lang.IllegalStateException: allTopicPartitionsNumber 2 not equals expected: 5` (cherry picked from commit bbce00a2245cf05b829182a9a75a86d4e1139492) --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 8 +++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 227e74d..17da94a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -1170,4 +1171,48 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { } } + @Test(timeOut = testTimeout) + public void testPartitionsUpdatesForMultipleTopics() throws Exception { + final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0"; + final String subName = "my-sub"; + admin.topics().createPartitionedTopic(topicName0, 2); + assertEquals(admin.topics().getPartitionedTopicMetadata(topicName0).partitions, 2); + + PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING) + .topicsPattern("persistent://public/default/test.*") + .subscriptionType(SubscriptionType.Failover) + .subscriptionName(subName) + .subscribe(); + + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 2); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2); + + admin.topics().updatePartitionedTopic(topicName0, 5); + consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 5); + }); + + final String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1"; + admin.topics().createPartitionedTopic(topicName1, 3); + assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3); + + consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout()); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8); + }); + + admin.topics().updatePartitionedTopic(topicName1, 5); + consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10); + }); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4149b39..55ee806 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1154,6 +1154,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return consumers.values().stream().collect(Collectors.toList()); } + // get all partitions that in the topics map + int getPartitionsOfTheTopicMap() { + return topics.values().stream().mapToInt(Integer::intValue).sum(); + } + @Override public void pause() { synchronized (pauseMutex) { @@ -1228,7 +1233,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { future.complete(null); return future; } else if (oldPartitionNumber < currentPartitionNumber) { - allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber); + allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber); + topics.put(topicName, currentPartitionNumber); List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber); // subscribe new added partitions List<CompletableFuture<Consumer<T>>> futureList = newPartitions
