This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eb173a  On multi-topic consumer, we shouldn't keep checking the 
partitioned metadata (#10708)
9eb173a is described below

commit 9eb173af32888e4d1361fee2f0dd034a9def2636
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 28 22:15:52 2021 -0700

    On multi-topic consumer, we shouldn't keep checking the partitioned 
metadata (#10708)
    
    * On multi-topic consumer, we shouldn't keep checking the partitioned 
metadata
    
    * Added NON_PARTITIONED constant
    
    * Removed assertion that is now invalid
    
    * Fixed handling of deleted partitioned topic
    
    * Fixed re-subscribing same topic
---
 .../client/impl/PatternTopicsConsumerImplTest.java |  44 ++++----
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  18 +--
 .../client/impl/MultiTopicsConsumerImpl.java       | 123 +++++++++++----------
 .../impl/PatternMultiTopicsConsumerImpl.java       |  13 ++-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  43 +++++++
 .../common/partition/PartitionedTopicMetadata.java |   5 +
 6 files changed, 153 insertions(+), 93 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 55649ab..bf6edf4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -183,12 +183,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
+        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
 
         topics.forEach(topic -> log.debug("topic: {}", topic));
         consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -196,7 +196,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         IntStream.range(0, topics.size()).forEach(index ->
             assertEquals(consumers.get(index).getTopic(), topics.get(index)));
 
-        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: 
{}", topic));
 
         // 5. produce data
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -286,12 +286,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
+        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 1);
         assertEquals(consumers.size(), 1);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 1);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
 
         topics.forEach(topic -> log.debug("topic: {}", topic));
         consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -299,7 +299,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         IntStream.range(0, topics.size()).forEach(index ->
             assertEquals(consumers.get(index).getTopic(), topics.get(index)));
 
-        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: 
{}", topic));
 
         // 5. produce data
         for (int i = 0; i < totalMessages / 4; i++) {
@@ -377,12 +377,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
+        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 7);
         assertEquals(consumers.size(), 7);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 4);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
 
         topics.forEach(topic -> log.debug("topic: {}", topic));
         consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -390,7 +390,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         IntStream.range(0, topics.size()).forEach(index ->
             assertEquals(consumers.get(index).getTopic(), topics.get(index)));
 
-        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: 
{}", topic));
 
         // 5. produce data
         for (int i = 0; i < totalMessages / 4; i++) {
@@ -508,9 +508,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 3. verify consumer get methods, to get 5 number of partitions and 
topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 5);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 5);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 5);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 2);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
 
         // 4. create producer
         String messagePredicate = "my-message-" + key + "-";
@@ -537,9 +537,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 6. verify consumer get methods, to get number of partitions and 
topics, value 6=1+2+3.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
 
 
         // 7. produce data
@@ -614,9 +614,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 4. verify consumer get methods, to get 6 number of partitions and 
topics: 6=1+2+3
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
 
         // 5. produce data to topic 1,2,3; verify should receive all the 
message
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -649,9 +649,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 10);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 10);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 10);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 4);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 3);
 
         // 8. produce data to topic3 and topic4, verify should receive all the 
message
         for (int i = 0; i < totalMessages / 2; i++) {
@@ -723,9 +723,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 4. verify consumer get methods, to get 0 number of partitions and 
topics: 6=1+2+3
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitions().size(), 6);
         assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 2);
 
         // 5. produce data to topic 1,2,3; verify should receive all the 
message
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -757,9 +757,9 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
-        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitions().size(), 2);
         assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
-        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 1);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 1);
 
         // 8. produce data to topic2, verify should receive all the message
         for (int i = 0; i < totalMessages; i++) {
@@ -808,7 +808,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 4. verify consumer get methods
         assertSame(consumerImpl.getPattern(), pattern);
-        assertEquals(consumerImpl.getTopics().size(), 2);
+        assertEquals(consumerImpl.getPartitionedTopics().size(), 0);
 
         producer1.send("msg-1");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index cb89d33..898e534 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -148,7 +148,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertTrue(consumer instanceof MultiTopicsConsumerImpl);
         
assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
-        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
 
         topics.forEach(topic -> log.info("topic: {}", topic));
@@ -157,7 +157,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         IntStream.range(0, 6).forEach(index ->
             assertEquals(consumers.get(index).getTopic(), topics.get(index)));
 
-        assertEquals(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 3);
+        assertEquals(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
 
         consumer.unsubscribe();
         consumer.close();
@@ -563,12 +563,12 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(messageSet, totalMessages * 2 / 3);
 
         // 7. use getter to verify internal topics number after un-subscribe 
topic3
-        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitions();
         List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
 
         assertEquals(topics.size(), 3);
         assertEquals(consumers.size(), 3);
-        assertEquals(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 2);
+        assertEquals(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 1);
 
         // 8. re-subscribe topic3
         CompletableFuture<Void> subFuture = 
((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
@@ -594,12 +594,12 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(messageSet, totalMessages);
 
         // 11. use getter to verify internal topics number after subscribe 
topic3
-        topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
         consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
-        assertEquals(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 3);
+        assertEquals(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
 
         consumer.unsubscribe();
         consumer.close();
@@ -1181,20 +1181,20 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
-        Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 3);
+        Assert.assertEquals(consumer.getConsumers().size(), 3);
 
         admin.topics().deletePartitionedTopic(topicName, true);
         
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
-            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 
0);
+            Assert.assertEquals(consumer.getConsumers().size(), 0);
         });
 
         admin.topics().createPartitionedTopic(topicName, 7);
         
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
-            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 
7);
+            Assert.assertEquals(consumer.getConsumers().size(), 7);
         });
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index df6a830..63027a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -52,6 +53,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -80,7 +82,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
 
     // Map <topic, numPartitions>, store partition number for each topic
-    protected final ConcurrentHashMap<String, Integer> topics;
+    protected final ConcurrentHashMap<String, Integer> partitionedTopics;
 
     // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
     // shared incoming queue was full
@@ -141,7 +143,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics 
Consumer");
 
-        this.topics = new ConcurrentHashMap<>();
+        this.partitionedTopics = new ConcurrentHashMap<>();
         this.consumers = new ConcurrentHashMap<>();
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
@@ -805,7 +807,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private void removeTopic(String topic) {
         String fullTopicName = getFullTopicName(topic);
         if (fullTopicName != null) {
-            topics.remove(topic);
+            partitionedTopics.remove(topic);
         }
     }
 
@@ -817,7 +819,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     new PulsarClientException.AlreadyClosedException("Topic 
name not valid"));
         }
         String fullTopicName = topicNameInstance.toString();
-        if (topics.containsKey(fullTopicName) || 
topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
+        if (consumers.containsKey(fullTopicName)
+                || 
partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
             return FutureUtil.failedFuture(
                     new PulsarClientException.AlreadyClosedException("Already 
subscribed to " + topicName));
         }
@@ -881,7 +884,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     new PulsarClientException.AlreadyClosedException("Topic 
name not valid"));
         }
         String fullTopicName = topicNameInstance.toString();
-        if (topics.containsKey(fullTopicName) || 
topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
+        if (consumers.containsKey(fullTopicName)
+                || 
partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
             return FutureUtil.failedFuture(
                     new PulsarClientException.AlreadyClosedException("Already 
subscribed to " + topicName));
         }
@@ -916,10 +920,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         List<CompletableFuture<Consumer<T>>> futureList;
-        if (numPartitions > 0) {
+        if (numPartitions != PartitionedTopicMetadata.NON_PARTITIONED) {
             // Below condition is true if subscribeAsync() has been invoked 
second time with same
             // topicName before the first invocation had reached this point.
-            boolean isTopicBeingSubscribedForInOtherThread = 
this.topics.putIfAbsent(topicName, numPartitions) != null;
+            boolean isTopicBeingSubscribedForInOtherThread =
+                    partitionedTopics.putIfAbsent(topicName, numPartitions) != 
null;
             if (isTopicBeingSubscribedForInOtherThread) {
                 String errorMessage = String.format("[%s] Failed to subscribe 
for topic [%s] in topics consumer. "
                     + "Topic is already being subscribed for in other 
thread.", topic, topicName);
@@ -955,27 +960,31 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     })
                 .collect(Collectors.toList());
         } else {
-            boolean isTopicBeingSubscribedForInOtherThread = 
this.topics.putIfAbsent(topicName, 1) != null;
-            if (isTopicBeingSubscribedForInOtherThread) {
-                String errorMessage = String.format("[%s] Failed to subscribe 
for topic [%s] in topics consumer. "
-                    + "Topic is already being subscribed for in other 
thread.", topic, topicName);
-                log.warn(errorMessage);
-                subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
-                return;
-            }
             allTopicPartitionsNumber.incrementAndGet();
 
             CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-            ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, 
topicName, internalConfig,
-                        client.externalExecutorProvider(), -1,
-                        true, subFuture, null, schema, interceptors,
-                        createIfDoesNotExist);
-            synchronized (pauseMutex) {
-                if (paused) {
-                    newConsumer.pause();
+
+            consumers.compute(topicName, (key, existingValue) -> {
+                if (existingValue != null) {
+                    String errorMessage = String.format("[%s] Failed to 
subscribe for topic [%s] in topics consumer. "
+                            + "Topic is already being subscribed for in other 
thread.", topic, topicName);
+                    log.warn(errorMessage);
+                    subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
+                    return existingValue;
+                } else {
+                    ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
+                            client.externalExecutorProvider(), -1,
+                            true, subFuture, null, schema, interceptors,
+                            createIfDoesNotExist);
+
+                    synchronized (pauseMutex) {
+                        if (paused) {
+                            newConsumer.pause();
+                        }
+                    }
+                    return newConsumer;
                 }
-                consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
-            }
+            });
 
             futureList = Collections.singletonList(subFuture);
         }
@@ -985,11 +994,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
                     setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
                 }
-                int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
-                int currentAllTopicsPartitionsNumber = 
allTopicPartitionsNumber.get();
-                checkState(currentAllTopicsPartitionsNumber == numTopics,
-                    "allTopicPartitionsNumber " + 
currentAllTopicsPartitionsNumber
-                        + " not equals expected: " + numTopics);
 
                 // We have successfully created new consumers, so we can start 
receiving messages for them
                 startReceivingMessages(
@@ -1144,12 +1148,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
 
     // get topics name
-    public List<String> getTopics() {
-        return topics.keySet().stream().collect(Collectors.toList());
+    public List<String> getPartitionedTopics() {
+        return 
partitionedTopics.keySet().stream().collect(Collectors.toList());
     }
 
     // get partitioned topics name
-    public List<String> getPartitionedTopics() {
+    public List<String> getPartitions() {
         return consumers.keySet().stream().collect(Collectors.toList());
     }
 
@@ -1160,7 +1164,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     // get all partitions that in the topics map
     int getPartitionsOfTheTopicMap() {
-        return topics.values().stream().mapToInt(Integer::intValue).sum();
+        return 
partitionedTopics.values().stream().mapToInt(Integer::intValue).sum();
     }
 
     @Override
@@ -1221,17 +1225,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     // subscribe increased partitions for a given topic
     private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String 
topicName) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        int oldPartitionNumber = partitionedTopics.get(topicName);
 
-        //Drop the disconnected consumers to allow the auto discovery
-        consumers.entrySet().removeIf(e -> 
TopicName.get(e.getKey()).getPartitionedTopicName().equals(topicName) && 
!e.getValue().isConnected());
-        final int connectedPartitions = 
Long.valueOf(consumers.entrySet().stream().filter(e -> 
e.getKey().contains(topicName)).count()).intValue();
-        allTopicPartitionsNumber.set(allTopicPartitionsNumber.get() - 
(topics.get(topicName) - connectedPartitions));
-        topics.put(topicName, connectedPartitions);
-
-        client.getPartitionsForTopic(topicName).thenCompose(list -> {
-            int oldPartitionNumber = topics.get(topicName);
-            int currentPartitionNumber = Long.valueOf(list.stream().filter(t 
-> TopicName.get(t).isPartitioned()).count()).intValue();
+        return client.getPartitionsForTopic(topicName).thenCompose(list -> {
+            int currentPartitionNumber = Long.valueOf(list.stream()
+                    .filter(t -> 
TopicName.get(t).isPartitioned()).count()).intValue();
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}] partitions number. old: {}, new: {}",
@@ -1240,11 +1238,28 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
             if (oldPartitionNumber == currentPartitionNumber) {
                 // topic partition number not changed
-                future.complete(null);
-                return future;
+                return CompletableFuture.completedFuture(null);
+            } else if (currentPartitionNumber == 
PartitionedTopicMetadata.NON_PARTITIONED) {
+                // The topic was initially partitioned but then it was 
deleted. We keep it in the topics
+                partitionedTopics.put(topicName, 0);
+
+                allTopicPartitionsNumber.addAndGet(-oldPartitionNumber);
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                for (Iterator<Map.Entry<String, ConsumerImpl<T>>> it = 
consumers.entrySet().iterator(); it.hasNext();) {
+                    Map.Entry<String, ConsumerImpl<T>> e = it.next();
+                    String partitionedTopicName = 
TopicName.get(e.getKey()).getPartitionedTopicName();
+
+                    // Remove the consumers that belong to the deleted 
partitioned topic
+                    if (partitionedTopicName.equals(topicName)) {
+                        futures.add(e.getValue().closeAsync());
+                        consumers.remove(e.getKey());
+                    }
+                }
+
+                return FutureUtil.waitForAll(futures);
             } else if (oldPartitionNumber < currentPartitionNumber) {
                 allTopicPartitionsNumber.addAndGet(currentPartitionNumber - 
oldPartitionNumber);
-                topics.put(topicName, currentPartitionNumber);
+                partitionedTopics.put(topicName, currentPartitionNumber);
                 List<String> newPartitions = list.subList(oldPartitionNumber, 
currentPartitionNumber);
                 // subscribe new added partitions
                 List<CompletableFuture<Consumer<T>>> futureList = newPartitions
@@ -1273,29 +1288,19 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     .collect(Collectors.toList());
 
                 // wait for all partitions subscribe future complete, then 
startReceivingMessages
-                FutureUtil.waitForAll(futureList)
+                return FutureUtil.waitForAll(futureList)
                     .thenAccept(finalFuture -> {
                         List<ConsumerImpl<T>> newConsumerList = 
newPartitions.stream()
                             .map(partitionTopic -> 
consumers.get(partitionTopic))
                             .collect(Collectors.toList());
                         startReceivingMessages(newConsumerList);
-                        future.complete(null);
-                    })
-                    .exceptionally(ex -> {
-                        log.warn("[{}] Failed to subscribe {} partition: {} - 
{} : {}",
-                            topic, topicName, oldPartitionNumber, 
currentPartitionNumber, ex);
-                        future.completeExceptionally(ex);
-                        return null;
                     });
             } else {
                 log.error("[{}] not support shrink topic partitions. old: {}, 
new: {}",
                     topicName, oldPartitionNumber, currentPartitionNumber);
-                future.completeExceptionally(new NotSupportedException("not 
support shrink topic partitions"));
+                return FutureUtil.failedFuture(new NotSupportedException("not 
support shrink topic partitions"));
             }
-            return future;
         });
-
-        return future;
     }
 
     private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@@ -1311,7 +1316,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
             // if last auto update not completed yet, do nothing.
             if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
-                partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(topics.keySet());
+                partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
             }
 
             // schedule the next re-check task
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 7769d70..2f946af 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -92,7 +92,14 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             }
 
             List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
-            List<String> oldTopics = 
PatternMultiTopicsConsumerImpl.this.getTopics();
+            List<String> oldTopics = Lists.newArrayList();
+            oldTopics.addAll(getPartitionedTopics());
+            getPartitions().forEach(p -> {
+                TopicName t = TopicName.get(p);
+                if (!t.isPartitioned() || 
!oldTopics.contains(t.getPartitionedTopicName())) {
+                    oldTopics.add(p);
+                }
+            });
 
             
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, 
oldTopics)));
             
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, 
newTopics)));
@@ -131,7 +138,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                 return removeFuture;
             }
 
-            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(topics.size());
+            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(partitionedTopics.size());
             removedTopics.stream().forEach(topic -> 
futures.add(removeConsumerAsync(topic)));
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> removeFuture.complete(null))
@@ -152,7 +159,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                 return addFuture;
             }
 
-            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(topics.size());
+            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(partitionedTopics.size());
             addedTopics.stream().forEach(topic -> 
futures.add(subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> addFuture.complete(null))
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index dc70d15..745aa1c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Sets;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Messages;
@@ -31,6 +35,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.internal.verification.VerificationModeFactory;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -63,9 +68,14 @@ public class MultiTopicsConsumerImplTest {
         conf.setStatsIntervalSeconds(100);
 
         ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+
+        @Cleanup("shutdown")
         EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
+
+        @Cleanup("shutdownNow")
         ExecutorProvider executorProvider = new ExecutorProvider(1, 
"client-test-stats");
 
+        @Cleanup
         PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
 
         ConsumerConfigurationData consumerConfData = new 
ConsumerConfigurationData();
@@ -169,4 +179,37 @@ public class MultiTopicsConsumerImplTest {
         verify(clientMock, times(1)).cleanupConsumer(any());
     }
 
+    @Test
+    public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() 
throws Exception {
+        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+        ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
+        consumerConfData.setSubscriptionName("subscriptionName");
+        consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", 
"c")));
+        consumerConfData.setAutoUpdatePartitionsIntervalSeconds(1);
+        consumerConfData.setAutoUpdatePartitions(true);
+
+        @Cleanup("stop")
+        Timer timer = new HashedWheelTimer();
+
+        PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx();
+        when(clientMock.timer()).thenReturn(timer);
+        when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+
+        // Simulate non partitioned topics
+        PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);
+        
when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata));
+        CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
+
+        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<>(
+                clientMock, consumerConfData, executorProvider,
+                completeFuture, Schema.BYTES, null, true);
+        impl.setState(HandlerState.State.Ready);
+        Thread.sleep(5000);
+
+        // getPartitionedTopicMetadata should have been called only the first 
time, for each of the 3 topics,
+        // but not anymore since the topics are not partitioned.
+        verify(clientMock, times(3)).getPartitionedTopicMetadata(any());
+    }
+
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
index 1f023de..024ca0a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
@@ -34,4 +34,9 @@ public class PartitionedTopicMetadata {
         this.partitions = partitions;
     }
 
+    /**
+     * A topic with '0' partitions is treated like non-partitioned topic.
+     */
+    public static final int NON_PARTITIONED = 0;
+
 }

Reply via email to