This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b4788d6fe0c60267c3323614f9ce71beb1d4635
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 9 08:53:54 2024 +0800

    [fix][client] Fix pattern consumer create crash if a part of partitions of 
a topic have been deleted (#22854)
    
    (cherry picked from commit 9626e7e090e9481e12441a47cf7e89f209aadd03)
---
 .../apache/pulsar/broker/service/TopicGCTest.java  | 291 ++++++++++++++++++++-
 .../client/api/PatternMultiTopicsConsumerTest.java |  37 +++
 .../client/impl/PatternTopicsConsumerImplTest.java |  13 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |   5 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   6 +
 .../apache/pulsar/client/impl/LookupService.java   |   9 +-
 .../client/impl/MultiTopicsConsumerImpl.java       | 188 +++++++------
 .../client/impl/PatternConsumerUpdateQueue.java    | 254 ++++++++++++++++++
 .../impl/PatternMultiTopicsConsumerImpl.java       | 249 ++++++++++++------
 .../pulsar/client/impl/PulsarClientImpl.java       |  15 +-
 .../pulsar/client/impl/TopicListWatcher.java       |  16 +-
 .../impl/PatternConsumerUpdateQueueTest.java       | 247 +++++++++++++++++
 .../impl/PatternMultiTopicsConsumerImplTest.java   |   6 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   |  12 +-
 .../pulsar/common/lookup/GetTopicsResult.java      |  24 ++
 .../org/apache/pulsar/common/naming/TopicName.java |   9 +
 .../org/apache/pulsar/common/topics/TopicList.java |  15 +-
 .../org/apache/pulsar/common/util/FutureUtil.java  |   3 +
 18 files changed, 1200 insertions(+), 199 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
index 8fdf0723ea8..172bd3702e1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -18,24 +18,34 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -58,14 +68,38 @@ public class TopicGCTest extends ProducerConsumerBase {
     @EqualsAndHashCode.Include
     protected void doInitConf() throws Exception {
         super.doInitConf();
-        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
-        
this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
-        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10);
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsMode(
+                InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10);
     }
 
-    @Test
-    public void testCreateConsumerAfterOnePartDeleted() throws Exception {
+    private enum SubscribeTopicType {
+        MULTI_PARTITIONED_TOPIC,
+        REGEX_TOPIC;
+    }
+
+    @DataProvider(name = "subscribeTopicTypes")
+    public Object[][] subTopicTypes() {
+        return new Object[][]{
+                {SubscribeTopicType.MULTI_PARTITIONED_TOPIC},
+                {SubscribeTopicType.REGEX_TOPIC}
+        };
+    }
+
+    private void setSubscribeTopic(ConsumerBuilder consumerBuilder, 
SubscribeTopicType subscribeTopicType,
+                                   String topicName, String topicPattern) {
+        if 
(subscribeTopicType.equals(SubscribeTopicType.MULTI_PARTITIONED_TOPIC)) {
+            consumerBuilder.topic(topicName);
+        } else {
+            consumerBuilder.topicsPattern(Pattern.compile(topicPattern));
+        }
+    }
+
+    @Test(dataProvider = "subscribeTopicTypes", timeOut = 300 * 1000)
+    public void testRecreateConsumerAfterOnePartGc(SubscribeTopicType 
subscribeTopicType) throws Exception {
         final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topicPattern = "persistent://public/default/tp.*";
         final String partition0 = topic + "-partition-0";
         final String partition1 = topic + "-partition-1";
         final String subscription = "s1";
@@ -77,8 +111,12 @@ public class TopicGCTest extends ProducerConsumerBase {
                 .enableBatching(false).create();
         Producer<String> producer1 = 
pulsarClient.newProducer(Schema.STRING).topic(partition1)
                 .enableBatching(false).create();
-        org.apache.pulsar.client.api.Consumer<String> consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
-                
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
+        ConsumerBuilder<String> consumerBuilder1 = 
pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscription)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared);
+        setSubscribeTopic(consumerBuilder1, subscribeTopicType, topic, 
topicPattern);
+        Consumer<String> consumer1 = consumerBuilder1.subscribe();
 
         // Make consume all messages for one topic, do not consume any 
messages for another one.
         producer0.send("1");
@@ -97,18 +135,247 @@ public class TopicGCTest extends ProducerConsumerBase {
         });
 
         // Verify that the consumer subscribed with partitioned topic can be 
created successful.
-        Consumer<String> consumerAllPartition = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
-                
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
-        Message<String> msg = consumerAllPartition.receive(2, 
TimeUnit.SECONDS);
+        ConsumerBuilder<String> consumerBuilder2 = 
pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscription)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared);
+        setSubscribeTopic(consumerBuilder2, subscribeTopicType, topic, 
topicPattern);
+        Consumer<String> consumer2 = consumerBuilder2.subscribe();
+        Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
+        String receivedMsgValue = msg.getValue();
+        log.info("received msg: {}", receivedMsgValue);
+        consumer2.acknowledge(msg);
+
+        // cleanup.
+        consumer2.close();
+        producer0.close();
+        producer1.close();
+        admin.topics().deletePartitionedTopic(topic);
+    }
+
+    @Test(dataProvider = "subscribeTopicTypes", timeOut = 300 * 1000)
+    public void testAppendCreateConsumerAfterOnePartGc(SubscribeTopicType 
subscribeTopicType) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topicPattern = "persistent://public/default/tp.*";
+        final String partition0 = topic + "-partition-0";
+        final String partition1 = topic + "-partition-1";
+        final String subscription = "s1";
+        admin.topics().createPartitionedTopic(topic, 2);
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+
+        // create consumers and producers.
+        Producer<String> producer0 = 
pulsarClient.newProducer(Schema.STRING).topic(partition0)
+                .enableBatching(false).create();
+        Producer<String> producer1 = 
pulsarClient.newProducer(Schema.STRING).topic(partition1)
+                .enableBatching(false).create();
+        ConsumerBuilder<String> consumerBuilder1 = 
pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscription)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared);
+        setSubscribeTopic(consumerBuilder1, subscribeTopicType, topic, 
topicPattern);
+        Consumer<String> consumer1 = consumerBuilder1.subscribe();
+
+        // Make consume all messages for one topic, do not consume any 
messages for another one.
+        producer0.send("partition-0-1");
+        producer1.send("partition-1-1");
+        producer1.send("partition-1-2");
+        producer1.send("partition-1-4");
+        admin.topics().skipAllMessages(partition0, subscription);
+
+        // Wait for topic GC.
+        // Partition 0 will be deleted about 20s later, left 2min to avoid 
flaky.
+        producer0.close();
+        Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
+            CompletableFuture<Optional<Topic>> tp1 = 
pulsar.getBrokerService().getTopic(partition0, false);
+            CompletableFuture<Optional<Topic>> tp2 = 
pulsar.getBrokerService().getTopic(partition1, false);
+            assertTrue(tp1 == null || !tp1.get().isPresent());
+            assertTrue(tp2 != null && tp2.get().isPresent());
+        });
+
+        // Verify that the messages under "partition-1" still can be ack.
+        for (int i = 0; i < 2; i++) {
+            Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS);
+            assertNotNull(msg, "Expected at least received 2 messages.");
+            log.info("received msg[{}]: {}", i, msg.getValue());
+            TopicMessageId messageId = (TopicMessageId) msg.getMessageId();
+            if (messageId.getOwnerTopic().equals(partition1)) {
+                consumer1.acknowledgeAsync(msg);
+            }
+        }
+        consumer1.close();
+
+        // Verify that the consumer subscribed with partitioned topic can be 
created successful.
+        ConsumerBuilder<String> consumerBuilder2 = 
pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscription)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared);
+        setSubscribeTopic(consumerBuilder2, subscribeTopicType, topic, 
topicPattern);
+        Consumer<String> consumer2 = consumerBuilder2.subscribe();
+        producer1.send("partition-1-5");
+        Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
         assertNotNull(msg);
         String receivedMsgValue = msg.getValue();
         log.info("received msg: {}", receivedMsgValue);
-        consumerAllPartition.acknowledge(msg);
+        consumer2.acknowledge(msg);
 
         // cleanup.
-        consumerAllPartition.close();
+        consumer2.close();
         producer0.close();
         producer1.close();
         admin.topics().deletePartitionedTopic(topic);
     }
+
+    @Test(timeOut = 180 * 1000)
+    public void testPhasePartDeletion() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topicPattern = "persistent://public/default/tp.*";
+        final String partition0 = topic + "-partition-0";
+        final String partition1 = topic + "-partition-1";
+        final String partition2 = topic + "-partition-2";
+        final String subscription = "s1";
+        admin.topics().createPartitionedTopic(topic, 3);
+        // Create consumer.
+        PatternMultiTopicsConsumerImpl<String> c1 = 
(PatternMultiTopicsConsumerImpl<String>) pulsarClient
+                .newConsumer(Schema.STRING)
+                .subscriptionName(subscription)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .topicsPattern(Pattern.compile(topicPattern)).subscribe();
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 1);
+            assertEquals(partitionedTopics.get(topic), 3);
+            assertEquals(consumers.size(), 3);
+            assertTrue(consumers.containsKey(partition0));
+            assertTrue(consumers.containsKey(partition1));
+            assertTrue(consumers.containsKey(partition2));
+        });
+        // Delete partitions the first time.
+        admin.topics().delete(partition0, true);
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 1);
+            assertEquals(partitionedTopics.get(topic), 3);
+            assertEquals(consumers.size(), 2);
+            assertTrue(consumers.containsKey(partition1));
+            assertTrue(consumers.containsKey(partition2));
+        });
+        // Delete partitions the second time.
+        admin.topics().delete(partition1, true);
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 1);
+            assertEquals(partitionedTopics.get(topic), 3);
+            assertEquals(consumers.size(), 1);
+            assertTrue(consumers.containsKey(partition2));
+        });
+        // Delete partitions the third time.
+        admin.topics().delete(partition2, true);
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 0);
+            assertEquals(consumers.size(), 0);
+        });
+
+        // cleanup.
+        c1.close();
+        admin.topics().deletePartitionedTopic(topic);
+    }
+
+    @Test(timeOut = 180 * 1000)
+    public void testExpandPartitions() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topicPattern = "persistent://public/default/tp.*";
+        final String partition0 = topic + "-partition-0";
+        final String partition1 = topic + "-partition-1";
+        final String subscription = "s1";
+        admin.topics().createPartitionedTopic(topic, 2);
+        // Delete partitions.
+        admin.topics().delete(partition0, true);
+        admin.topics().delete(partition1, true);
+        // Create consumer.
+        PatternMultiTopicsConsumerImpl<String> c1 = 
(PatternMultiTopicsConsumerImpl<String>) pulsarClient
+                .newConsumer(Schema.STRING)
+                .subscriptionName(subscription)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .topicsPattern(Pattern.compile(topicPattern)).subscribe();
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 0);
+            assertEquals(consumers.size(), 0);
+        });
+        // Trigger partitions creation.
+        pulsarClient.newConsumer(Schema.STRING).subscriptionName(subscription)
+                
.subscriptionType(SubscriptionType.Shared).topic(topic).subscribe().close();
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 1);
+            assertEquals(partitionedTopics.get(topic), 2);
+            assertEquals(consumers.size(), 2);
+            assertTrue(consumers.containsKey(partition0));
+            assertTrue(consumers.containsKey(partition1));
+        });
+        // Expand partitions the first time.
+        admin.topics().updatePartitionedTopic(topic, 3);
+        final String partition2 = topic + "-partition-2";
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 1);
+            assertEquals(partitionedTopics.get(topic), 3);
+            assertEquals(consumers.size(), 3);
+            assertTrue(consumers.containsKey(partition0));
+            assertTrue(consumers.containsKey(partition1));
+            assertTrue(consumers.containsKey(partition2));
+        });
+        // Expand partitions the second time.
+        admin.topics().updatePartitionedTopic(topic, 4);
+        final String partition3 = topic + "-partition-3";
+        // Check subscriptions.
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentHashMap<String, ConsumerImpl<?>> consumers
+                    = WhiteboxImpl.getInternalState(c1, "consumers");
+            ConcurrentHashMap<String, Integer> partitionedTopics
+                    = WhiteboxImpl.getInternalState(c1, "partitionedTopics");
+            assertEquals(partitionedTopics.size(), 1);
+            assertEquals(partitionedTopics.get(topic), 4);
+            assertEquals(consumers.size(), 4);
+            assertTrue(consumers.containsKey(partition0));
+            assertTrue(consumers.containsKey(partition1));
+            assertTrue(consumers.containsKey(partition2));
+            assertTrue(consumers.containsKey(partition3));
+        });
+
+        // cleanup.
+        c1.close();
+        admin.topics().deletePartitionedTopic(topic);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
index 00a47c39571..475477ac521 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.fail;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -95,4 +98,38 @@ public class PatternMultiTopicsConsumerTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    @Test(timeOut = 30000)
+    public void testFailedSubscribe() throws Exception {
+        final String topicName1 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
+        final String topicName2 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
+        final String topicName3 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
+        final String subName = "s1";
+        admin.topics().createPartitionedTopic(topicName1, 2);
+        admin.topics().createPartitionedTopic(topicName2, 3);
+        admin.topics().createNonPartitionedTopic(topicName3);
+
+        // Register a exclusive consumer to makes the pattern consumer failed 
to subscribe.
+        Consumer c1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName3).subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName).subscribe();
+
+        try {
+            PatternMultiTopicsConsumerImpl<String> consumer =
+                (PatternMultiTopicsConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                    .topicsPattern("persistent://public/default/tp_test.*")
+                    .subscriptionType(SubscriptionType.Failover)
+                    .subscriptionName(subName)
+                    .subscribe();
+            fail("Expected a consumer busy error.");
+        } catch (Exception ex) {
+            log.info("consumer busy", ex);
+        }
+
+        c1.close();
+        // Verify all internal consumer will be closed.
+        // If delete topic without "-f" work, it means the internal consumers 
were closed.
+        admin.topics().delete(topicName3);
+        admin.topics().deletePartitionedTopic(topicName2);
+        admin.topics().deletePartitionedTopic(topicName1);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 94d78e418ab..3bfecae23a5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -35,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 
-import io.netty.util.Timeout;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
@@ -53,6 +51,7 @@ import 
org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -1024,17 +1023,17 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 6. remove producer 1,3; verify only consumer 2 left
         // seems no direct way to verify auto-unsubscribe, because this 
patternConsumer also referenced the topic.
-        List<String> topicNames = Lists.newArrayList(topicName2);
+        String tp2p0 = TopicName.get(topicName2).getPartition(0).toString();
+        String tp2p1 = TopicName.get(topicName2).getPartition(1).toString();
+        List<String> topicNames = Lists.newArrayList(tp2p0, tp2p1);
         NamespaceService nss = pulsar.getNamespaceService();
         doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
                 
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
 
         // 7. call recheckTopics to unsubscribe topic 1,3, verify topics 
number: 2=6-1-3
         log.debug("recheck topics change");
-        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
-        Timeout recheckPatternTimeout = 
spy(consumer1.getRecheckPatternTimeout());
-        doReturn(false).when(recheckPatternTimeout).isCancelled();
-        consumer1.run(recheckPatternTimeout);
+        PatternConsumerUpdateQueue taskQueue = 
WhiteboxImpl.getInternalState(consumer, "updateTaskQueue");
+        taskQueue.appendRecheckOp();
         Thread.sleep(100);
         assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitions().size(), 2);
         assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index c343ab0d6e2..83cb5f2a440 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.netty.util.Timeout;
+import java.time.Duration;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -1321,7 +1322,6 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);
 
         admin.topics().updatePartitionedTopic(topicName0, 5);
-        
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
 
         Awaitility.await().untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
@@ -1341,9 +1341,8 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         });
 
         admin.topics().updatePartitionedTopic(topicName1, 5);
-        
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
 
-        Awaitility.await().untilAsserted(() -> {
+        Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
             Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
             Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 
10);
         });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 05081dcaa07..74abb82bfe8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.util.Timeout;
 import java.nio.charset.StandardCharsets;
@@ -1285,5 +1286,10 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         return batchReceiveTimeout != null;
     }
 
+    @VisibleForTesting
+    CompletableFuture<Consumer<T>> getSubscribeFuture() {
+        return subscribeFuture;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index ccd1f6b23f2..2fe457059c1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -117,7 +117,14 @@ public interface LookupService extends AutoCloseable {
     InetSocketAddress resolveHost();
 
     /**
-     * Returns all the topics name for a given namespace.
+     * Returns all the topics that matches {@param topicPattern} for a given 
namespace.
+     *
+     * Note: {@param topicPattern} it relate to the topic name(without the 
partition suffix). For example:
+     *  - There is a partitioned topic "tp-a" with two partitions.
+     *    - tp-a-partition-0
+     *    - tp-a-partition-1
+     *  - If {@param topicPattern} is "tp-a", the consumer will subscribe to 
the two partitions.
+     *  - if {@param topicPattern} is "tp-a-partition-0", the consumer will 
subscribe nothing.
      *
      * @param namespace : namespace-name
      * @return
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8047e05351a..e8cbf71e500 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import javax.annotation.Nullable;
@@ -68,6 +70,7 @@ import 
org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
@@ -81,14 +84,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     public static final String DUMMY_TOPIC_NAME_PREFIX = 
"MultiTopicsConsumer-";
 
     // Map <topic+partition, consumer>, when get do ACK, consumer will by find 
by topic name
-    private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
+    protected final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
 
     // Map <topic, numPartitions>, store partition number for each topic
     protected final ConcurrentHashMap<String, Integer> partitionedTopics;
 
     // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
     // shared incoming queue was full
-    private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
+    protected final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
 
     // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
     AtomicInteger allTopicPartitionsNumber;
@@ -1009,8 +1012,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     new PulsarClientException.AlreadyClosedException("Topic 
name not valid"));
         }
         String fullTopicName = topicNameInstance.toString();
-        if (consumers.containsKey(fullTopicName)
-                || 
partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
+        if (consumers.containsKey(fullTopicName)) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.AlreadyClosedException("Already 
subscribed to " + topicName));
+        }
+        if (!topicNameInstance.isPartitioned()
+                && 
partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
             return FutureUtil.failedFuture(
                     new PulsarClientException.AlreadyClosedException("Already 
subscribed to " + topicName));
         }
@@ -1047,7 +1054,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, numPartitions);
         }
 
-        List<CompletableFuture<Consumer<T>>> futureList;
+        CompletableFuture<Void> subscribeAllPartitionsFuture;
         if (numPartitions != PartitionedTopicMetadata.NON_PARTITIONED) {
             // Below condition is true if subscribeAsync() has been invoked 
second time with same
             // topicName before the first invocation had reached this point.
@@ -1067,30 +1074,50 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
             configurationData.setReceiverQueueSize(receiverQueueSize);
 
-            futureList = IntStream
-                .range(0, numPartitions)
-                .mapToObj(
-                    partitionIndex -> {
-                        String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
-                        CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        configurationData.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
-                                partitionIndex, subFuture, 
createIfDoesNotExist, schema);
-                        synchronized (pauseMutex) {
-                            if (paused) {
-                                newConsumer.pause();
-                            } else {
-                                newConsumer.resume();
-                            }
-                            consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
+            CompletableFuture<List<Integer>> partitionsFuture;
+            if (createIfDoesNotExist || 
!TopicName.get(topicName).isPersistent()) {
+                partitionsFuture = 
CompletableFuture.completedFuture(IntStream.range(0, numPartitions)
+                        .mapToObj(i -> Integer.valueOf(i))
+                        .collect(Collectors.toList()));
+            } else {
+                partitionsFuture = getExistsPartitions(topicName.toString());
+            }
+            subscribeAllPartitionsFuture = 
partitionsFuture.thenCompose(partitions -> {
+                if (partitions.isEmpty()) {
+                    partitionedTopics.remove(topicName, numPartitions);
+                    return CompletableFuture.completedFuture(null);
+                }
+                List<CompletableFuture<Consumer<T>>> subscribeList = new 
ArrayList<>();
+                for (int partitionIndex : partitions) {
+                    String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
+                    CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+                    configurationData.setStartPaused(paused);
+                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                            partitionIndex, subFuture, createIfDoesNotExist, 
schema);
+                    synchronized (pauseMutex) {
+                        if (paused) {
+                            newConsumer.pause();
+                        } else {
+                            newConsumer.resume();
                         }
-                        return subFuture;
-                    })
-                .collect(Collectors.toList());
+                        Consumer originalValue = 
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+                        if (originalValue != null) {
+                            newConsumer.closeAsync().exceptionally(ex -> {
+                                log.error("[{}] [{}] Failed to close the 
orphan consumer",
+                                        partitionName, subscription, ex);
+                                return null;
+                            });
+                        }
+                    }
+                    subscribeList.add(subFuture);
+                }
+                return FutureUtil.waitForAll(subscribeList);
+            });
         } else {
             allTopicPartitionsNumber.incrementAndGet();
 
-            CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+            CompletableFuture<Consumer<T>> subscribeFuture = new 
CompletableFuture<>();
+            subscribeAllPartitionsFuture = subscribeFuture.thenAccept(__ -> 
{});
 
             synchronized (pauseMutex) {
                 consumers.compute(topicName, (key, existingValue) -> {
@@ -1104,7 +1131,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     } else {
                         internalConfig.setStartPaused(paused);
                         ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
-                                -1, subFuture, createIfDoesNotExist, schema);
+                                -1, subscribeFuture, createIfDoesNotExist, 
schema);
                         if (paused) {
                             newConsumer.pause();
                         } else {
@@ -1114,11 +1141,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     }
                 });
             }
-            futureList = Collections.singletonList(subFuture);
+
         }
 
-        FutureUtil.waitForAll(futureList)
-            .thenAccept(finalFuture -> {
+        subscribeAllPartitionsFuture.thenAccept(finalFuture -> {
                 if (allTopicPartitionsNumber.get() > 
getCurrentReceiverQueueSize()) {
                     
setCurrentReceiverQueueSize(allTopicPartitionsNumber.get());
                 }
@@ -1139,6 +1165,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return;
             })
             .exceptionally(ex -> {
+                log.warn("[{}] Failed to subscribe for topic [{}] in topics 
consumer {}", topic, topicName,
+                        ex.getMessage());
                 handleSubscribeOneTopicError(topicName, ex, subscribeResult);
                 return null;
             });
@@ -1162,7 +1190,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     // handling failure during subscribe new topic, unsubscribe success 
created partitions
-    private void handleSubscribeOneTopicError(String topicName,
+    protected void handleSubscribeOneTopicError(String topicName,
                                               Throwable error,
                                               CompletableFuture<Void> 
subscribeFuture) {
         log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
{}", topic, topicName, error.getMessage());
@@ -1255,59 +1283,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return unsubscribeFuture;
     }
 
-    /***
-     * Remove a consumer for a topic.
-     * @param topicName topic name contains the partition suffix.
-     */
-    public CompletableFuture<Void> removeConsumerAsync(String topicName) {
-        checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + 
topicName);
-
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil.failedFuture(
-                new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
-        }
-
-        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        String topicPartName = 
TopicName.get(topicName).getPartitionedTopicName();
-
-
-        List<ConsumerImpl<T>> consumersToClose = consumers.values().stream()
-            .filter(consumer -> {
-                String consumerTopicName = consumer.getTopic();
-                return 
TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName);
-            }).collect(Collectors.toList());
-
-        List<CompletableFuture<Void>> futureList = consumersToClose.stream()
-            .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
-
-        FutureUtil.waitForAll(futureList)
-            .whenComplete((r, ex) -> {
-                if (ex == null) {
-                    consumersToClose.forEach(consumer1 -> {
-                        consumers.remove(consumer1.getTopic());
-                        pausedConsumers.remove(consumer1);
-                        allTopicPartitionsNumber.decrementAndGet();
-                    });
-
-                    removeTopic(topicName);
-                    if (unAckedMessageTracker instanceof 
UnAckedTopicMessageTracker) {
-                        ((UnAckedTopicMessageTracker) 
unAckedMessageTracker).removeTopicMessages(topicName);
-                    }
-
-                    unsubscribeFuture.complete(null);
-                    log.info("[{}] [{}] [{}] Removed Topics Consumer, 
allTopicPartitionsNumber: {}",
-                        topicName, subscription, consumerName, 
allTopicPartitionsNumber);
-                } else {
-                    unsubscribeFuture.completeExceptionally(ex);
-                    setState(State.Failed);
-                    log.error("[{}] [{}] [{}] Could not remove Topics 
Consumer",
-                        topicName, subscription, consumerName, ex.getCause());
-                }
-            });
-
-        return unsubscribeFuture;
-    }
-
 
     // get topics name
     public List<String> getPartitionedTopics() {
@@ -1573,4 +1548,51 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
         resumeReceivingFromPausedConsumersIfNeeded();
     }
+
+    /**
+     * Get the exists partitions of a partitioned topic, the result does not 
contain the partitions which has not been
+     * created yet(in other words, the partitions that do not exist in the 
response of "pulsar-admin topics list").
+     * @return sorted partitions list if it is a partitioned topic; @return an 
empty list if it is a non-partitioned
+     * topic.
+     */
+    private CompletableFuture<List<Integer>> getExistsPartitions(String topic) 
{
+        TopicName topicName = TopicName.get(topic);
+        if (!topicName.isPersistent()) {
+            return FutureUtil.failedFuture(new IllegalArgumentException("The 
method getExistsPartitions"
+                    + " does not support non-persistent topic yet."));
+        }
+        return 
client.getLookup().getTopicsUnderNamespace(topicName.getNamespaceObject(),
+                CommandGetTopicsOfNamespace.Mode.PERSISTENT,
+                TopicName.getPattern(topicName.getPartitionedTopicName()),
+                null).thenApply(getTopicsResult -> {
+            if (getTopicsResult.getNonPartitionedOrPartitionTopics() == null
+                    || 
getTopicsResult.getNonPartitionedOrPartitionTopics().isEmpty()) {
+                return Collections.emptyList();
+            }
+            // If broker version is less than "2.11.x", it does not support 
broker-side pattern check, so append
+            // a client-side pattern check.
+            // If lookup service is typed HttpLookupService, the HTTP API does 
not support broker-side pattern
+            // check yet, so append a client-side pattern check.
+            Predicate<String> clientSideFilter;
+            if (getTopicsResult.isFiltered()) {
+                clientSideFilter = __ -> true;
+            } else {
+                clientSideFilter =
+                        tp -> 
Pattern.compile(TopicName.getPartitionPattern(topic)).matcher(tp).matches();
+            }
+            ArrayList<Integer> list = new 
ArrayList<>(getTopicsResult.getNonPartitionedOrPartitionTopics().size());
+            for (String partition : 
getTopicsResult.getNonPartitionedOrPartitionTopics()) {
+                int partitionIndex = 
TopicName.get(partition).getPartitionIndex();
+                if (partitionIndex < 0) {
+                    // It is not a partition.
+                    continue;
+                }
+                if (clientSideFilter.test(partition)) {
+                    list.add(partitionIndex);
+                }
+            }
+            Collections.sort(list);
+            return list;
+        });
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java
new file mode 100644
index 00000000000..d6eba6463a0
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Used to make all tasks that will modify subscriptions will be executed one 
by one, and skip the unnecessary updating.
+ *
+ * So far, four three scenarios that will modify subscriptions:
+ * 1. When start pattern consumer.
+ * 2. After topic list watcher reconnected, it will call {@link 
PatternMultiTopicsConsumerImpl#recheckTopicsChange()}.
+ *    this scenario only exists in the version >= 2.11 (both client-version 
and broker version are >= 2.11).
+ * 3. A scheduled task will call {@link 
PatternMultiTopicsConsumerImpl#recheckTopicsChange()}, this scenario only
+ *    exists in the version < 2.11.
+ * 4. The topics change events will trigger a
+ *    {@link 
PatternMultiTopicsConsumerImpl#topicsChangeListener#onTopicsRemoved(Collection)}
 or
+ *    {@link 
PatternMultiTopicsConsumerImpl#topicsChangeListener#onTopicsAdded(Collection)}.
+ *
+ * When you are using this client connect to the broker whose version >= 2.11, 
there are three scenarios: [1, 2, 4].
+ * When you are using this client connect to the broker whose version < 2.11, 
there is only one scenario: [3] and all
+ *   the event will run in the same thread.
+ */
+@Slf4j
+@SuppressFBWarnings("EI_EXPOSE_REP2")
+public class PatternConsumerUpdateQueue {
+
+    private static final Pair<UpdateSubscriptionType, Collection<String>> 
RECHECK_OP =
+            Pair.of(UpdateSubscriptionType.RECHECK, null);
+
+    private final LinkedBlockingQueue<Pair<UpdateSubscriptionType, 
Collection<String>>> pendingTasks;
+
+    private final PatternMultiTopicsConsumerImpl patternConsumer;
+
+    private final PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener;
+
+    /**
+     * Whether there is a task is in progress, this variable is used to 
confirm whether a next-task triggering is
+     * needed.
+     */
+    private Pair<UpdateSubscriptionType, CompletableFuture<Void>> 
taskInProgress = null;
+
+    /**
+     * Whether there is a recheck task in queue.
+     * - Since recheck task will do all changes, it can be used to compress 
multiple tasks to one.
+     * - To avoid skipping the newest changes, once the recheck task is 
starting to work, this variable will be set
+     *   to "false".
+     */
+    private boolean recheckTaskInQueue = false;
+
+    private volatile long lastRecheckTaskStartingTimestamp = 0;
+
+    private boolean closed;
+
+    public PatternConsumerUpdateQueue(PatternMultiTopicsConsumerImpl 
patternConsumer) {
+        this(patternConsumer, patternConsumer.topicsChangeListener);
+    }
+
+    /** This constructor is only for test. **/
+    @VisibleForTesting
+    public PatternConsumerUpdateQueue(PatternMultiTopicsConsumerImpl 
patternConsumer,
+                                      
PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener) {
+        this.patternConsumer = patternConsumer;
+        this.topicsChangeListener = topicsChangeListener;
+        this.pendingTasks = new LinkedBlockingQueue<>();
+        // To avoid subscribing and topics changed events execute 
concurrently, let the change events starts after the
+        // subscribing task.
+        doAppend(Pair.of(UpdateSubscriptionType.CONSUMER_INIT, null));
+    }
+
+    synchronized void appendTopicsAddedOp(Collection<String> topics) {
+        if (topics == null || topics.isEmpty()) {
+            return;
+        }
+        doAppend(Pair.of(UpdateSubscriptionType.TOPICS_ADDED, topics));
+    }
+
+    synchronized void appendTopicsRemovedOp(Collection<String> topics) {
+        if (topics == null || topics.isEmpty()) {
+            return;
+        }
+        doAppend(Pair.of(UpdateSubscriptionType.TOPICS_REMOVED, topics));
+    }
+
+    synchronized void appendRecheckOp() {
+        doAppend(RECHECK_OP);
+    }
+
+    synchronized void doAppend(Pair<UpdateSubscriptionType, 
Collection<String>> task) {
+        if (log.isDebugEnabled()) {
+            log.debug("Pattern consumer [{}] try to append task. {} {}", 
patternConsumer.getSubscription(),
+                    task.getLeft(), task.getRight() == null ? "" : 
task.getRight());
+        }
+        // Once there is a recheck task in queue, it means other tasks can be 
skipped.
+        if (recheckTaskInQueue) {
+            return;
+        }
+
+        // Once there are too many tasks in queue, compress them as a recheck 
task.
+        if (pendingTasks.size() >= 30 && 
!task.getLeft().equals(UpdateSubscriptionType.RECHECK)) {
+            appendRecheckOp();
+            return;
+        }
+
+        pendingTasks.add(task);
+        if (task.getLeft().equals(UpdateSubscriptionType.RECHECK)) {
+            recheckTaskInQueue = true;
+        }
+
+        // If no task is in-progress, trigger a task execution.
+        if (taskInProgress == null) {
+            triggerNextTask();
+        }
+    }
+
+    synchronized void triggerNextTask() {
+        if (closed) {
+            return;
+        }
+
+        final Pair<UpdateSubscriptionType, Collection<String>> task = 
pendingTasks.poll();
+
+        // No pending task.
+        if (task == null) {
+            taskInProgress = null;
+            return;
+        }
+
+        // If there is a recheck task in queue, skip others and only call the 
recheck task.
+        if (recheckTaskInQueue && 
!task.getLeft().equals(UpdateSubscriptionType.RECHECK)) {
+            triggerNextTask();
+            return;
+        }
+
+        // Execute pending task.
+        CompletableFuture<Void> newTaskFuture = null;
+        switch (task.getLeft()) {
+            case CONSUMER_INIT: {
+                newTaskFuture = 
patternConsumer.getSubscribeFuture().thenAccept(__ -> {}).exceptionally(ex -> {
+                    // If the subscribe future was failed, the consumer will 
be closed.
+                    synchronized (PatternConsumerUpdateQueue.this) {
+                        this.closed = true;
+                        patternConsumer.closeAsync().exceptionally(ex2 -> {
+                            log.error("Pattern consumer failed to close, this 
error may left orphan consumers."
+                                    + " Subscription: {}", 
patternConsumer.getSubscription());
+                            return null;
+                        });
+                    }
+                    return null;
+                });
+                break;
+            }
+            case TOPICS_ADDED: {
+                newTaskFuture = 
topicsChangeListener.onTopicsAdded(task.getRight());
+                break;
+            }
+            case TOPICS_REMOVED: {
+                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(task.getRight());
+                break;
+            }
+            case RECHECK: {
+                recheckTaskInQueue = false;
+                lastRecheckTaskStartingTimestamp = System.currentTimeMillis();
+                newTaskFuture = patternConsumer.recheckTopicsChange();
+                break;
+            }
+            default: {
+                throw new RuntimeException("Un-support 
UpdateSubscriptionType");
+            }
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Pattern consumer [{}] starting task. {} {} ", 
patternConsumer.getSubscription(),
+                    task.getLeft(), task.getRight() == null ? "" : 
task.getRight());
+        }
+        // Trigger next pending task.
+        taskInProgress = Pair.of(task.getLeft(), newTaskFuture);
+        newTaskFuture.thenAccept(ignore -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Pattern consumer [{}] task finished. {} {} ", 
patternConsumer.getSubscription(),
+                        task.getLeft(), task.getRight() == null ? "" : 
task.getRight());
+            }
+            triggerNextTask();
+        }).exceptionally(ex -> {
+            /**
+             * Once a updating fails, trigger a delayed new recheck task to 
guarantee all things is correct.
+             * - Skip if there is already a recheck task in queue.
+             * - Skip if the last recheck task has been executed after the 
current time.
+             */
+            log.error("Pattern consumer [{}] task finished. {} {}. But it 
failed", patternConsumer.getSubscription(),
+                    task.getLeft(), task.getRight() == null ? "" : 
task.getRight(), ex);
+            // Skip if there is already a recheck task in queue.
+            synchronized (PatternConsumerUpdateQueue.this) {
+                if (recheckTaskInQueue || 
PatternConsumerUpdateQueue.this.closed) {
+                    return null;
+                }
+            }
+            // Skip if the last recheck task has been executed after the 
current time.
+            long failedTime = System.currentTimeMillis();
+            patternConsumer.getClient().timer().newTimeout(timeout -> {
+                if (lastRecheckTaskStartingTimestamp <= failedTime) {
+                    appendRecheckOp();
+                }
+            }, 10, TimeUnit.SECONDS);
+            triggerNextTask();
+            return null;
+        });
+    }
+
+    public synchronized CompletableFuture<Void> 
cancelAllAndWaitForTheRunningTask() {
+        this.closed = true;
+        if (taskInProgress == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // If the in-progress task is consumer init task, it means nothing is 
in-progress.
+        if 
(taskInProgress.getLeft().equals(UpdateSubscriptionType.CONSUMER_INIT)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return taskInProgress.getRight().thenAccept(__ -> {}).exceptionally(ex 
-> null);
+    }
+
+    private enum UpdateSubscriptionType {
+        /** A marker that indicates the consumer's subscribe task.**/
+        CONSUMER_INIT,
+        /** Triggered by {@link 
PatternMultiTopicsConsumerImpl#topicsChangeListener}.**/
+        TOPICS_ADDED,
+        /** Triggered by {@link 
PatternMultiTopicsConsumerImpl#topicsChangeListener}.**/
+        TOPICS_REMOVED,
+        /** A fully check for pattern consumer. **/
+        RECHECK;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index ec7ff7930c0..91c6da26d59 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -27,13 +27,13 @@ import io.netty.util.TimerTask;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -42,6 +42,7 @@ import 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.lookup.GetTopicsResult;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.BackoffBuilder;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
 
 public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T> implements TimerTask {
     private final Pattern topicsPattern;
-    private final TopicsChangedListener topicsChangeListener;
+    final TopicsChangedListener topicsChangeListener;
     private final Mode subscriptionMode;
     private final CompletableFuture<TopicListWatcher> watcherFuture = new 
CompletableFuture<>();
     protected NamespaceName namespaceName;
@@ -69,6 +70,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     private volatile Timeout recheckPatternTimeout = null;
     private volatile String topicsHash;
 
+    private PatternConsumerUpdateQueue updateTaskQueue;
+
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
@@ -98,20 +101,23 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         
checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString()));
 
         this.topicsChangeListener = new PatternTopicsChangedListener();
+        this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
         this.recheckPatternTimeout = client.timer()
                 .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
         if (subscriptionMode == Mode.PERSISTENT) {
             long watcherId = client.newTopicListWatcherId();
-            new TopicListWatcher(topicsChangeListener, client, topicsPattern, 
watcherId,
+            new TopicListWatcher(updateTaskQueue, client, topicsPattern, 
watcherId,
                 namespaceName, topicsHash, watcherFuture, () -> 
recheckTopicsChangeAfterReconnect());
             watcherFuture
                .thenAccept(__ -> recheckPatternTimeout.cancel())
                .exceptionally(ex -> {
-                   log.warn("Unable to create topic list watcher. Falling back 
to only polling for new topics", ex);
+                   log.warn("Pattern consumer [{}] unable to create topic list 
watcher. Falling back to only polling"
+                           + " for new topics", conf.getSubscriptionName(), 
ex);
                    return null;
                });
         } else {
-            log.debug("Not creating topic list watcher for subscription mode 
{}", subscriptionMode);
+            log.debug("Pattern consumer [{}] not creating topic list watcher 
for subscription mode {}",
+                    conf.getSubscriptionName(), subscriptionMode);
             watcherFuture.complete(null);
         }
     }
@@ -129,17 +135,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             return;
         }
         // Do check.
-        recheckTopicsChange().whenComplete((ignore, ex) -> {
-            if (ex != null) {
-                log.warn("[{}] Failed to recheck topics change: {}", topic, 
ex.getMessage());
-                long delayMs = recheckPatternTaskBackoff.next();
-                client.timer().newTimeout(timeout -> {
-                    recheckTopicsChangeAfterReconnect();
-                }, delayMs, TimeUnit.MILLISECONDS);
-            } else {
-                recheckPatternTaskBackoff.reset();
-            }
-        });
+        updateTaskQueue.appendRecheckOp();
     }
 
     // TimerTask to recheck topics change, and trigger subscribe/unsubscribe 
based on the change.
@@ -148,18 +144,10 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         if (timeout.isCancelled()) {
             return;
         }
-        recheckTopicsChange().exceptionally(ex -> {
-            log.warn("[{}] Failed to recheck topics change: {}", topic, 
ex.getMessage());
-            return null;
-        }).thenAccept(__ -> {
-            // schedule the next re-check task
-            this.recheckPatternTimeout = client.timer()
-                    .newTimeout(PatternMultiTopicsConsumerImpl.this,
-                    Math.max(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.SECONDS);
-        });
+        updateTaskQueue.appendRecheckOp();
     }
 
-    private CompletableFuture<Void> recheckTopicsChange() {
+    CompletableFuture<Void> recheckTopicsChange() {
         String pattern = topicsPattern.pattern();
         final int epoch = recheckPatternEpoch.incrementAndGet();
         return client.getLookup().getTopicsUnderNamespace(namespaceName, 
subscriptionMode, pattern, topicsHash)
@@ -172,22 +160,18 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                         return CompletableFuture.completedFuture(null);
                     }
                     if (log.isDebugEnabled()) {
-                        log.debug("Get topics under namespace {}, topics.size: 
{}, topicsHash: {}, filtered: {}",
+                        log.debug("Pattern consumer [{}] get topics under 
namespace {}, topics.size: {},"
+                                        + " topicsHash: {}, filtered: {}",
+                                
PatternMultiTopicsConsumerImpl.this.getSubscription(),
                                 namespaceName, 
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
                                 getTopicsResult.isFiltered());
                         getTopicsResult.getTopics().forEach(topicName ->
                                 log.debug("Get topics under namespace {}, 
topic: {}", namespaceName, topicName));
                     }
 
-                    final List<String> oldTopics = new 
ArrayList<>(getPartitionedTopics());
-                    for (String partition : getPartitions()) {
-                        TopicName topicName = TopicName.get(partition);
-                        if (!topicName.isPartitioned() || 
!oldTopics.contains(topicName.getPartitionedTopicName())) {
-                            oldTopics.add(partition);
-                        }
-                    }
+                    final List<String> oldTopics = new 
ArrayList<>(getPartitions());
                     return updateSubscriptions(topicsPattern, 
this::setTopicsHash, getTopicsResult,
-                            topicsChangeListener, oldTopics);
+                            topicsChangeListener, oldTopics, subscription);
                 }
             });
     }
@@ -196,7 +180,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                                                        
java.util.function.Consumer<String> topicsHashSetter,
                                                        GetTopicsResult 
getTopicsResult,
                                                        TopicsChangedListener 
topicsChangedListener,
-                                                       List<String> oldTopics) 
{
+                                                       List<String> oldTopics,
+                                                       String 
subscriptionForLog) {
         topicsHashSetter.accept(getTopicsResult.getTopicsHash());
         if (!getTopicsResult.isChanged()) {
             return CompletableFuture.completedFuture(null);
@@ -204,14 +189,20 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
 
         List<String> newTopics;
         if (getTopicsResult.isFiltered()) {
-            newTopics = getTopicsResult.getTopics();
+            newTopics = getTopicsResult.getNonPartitionedOrPartitionTopics();
         } else {
-            newTopics = TopicList.filterTopics(getTopicsResult.getTopics(), 
topicsPattern);
+            newTopics = 
getTopicsResult.filterTopics(topicsPattern).getNonPartitionedOrPartitionTopics();
         }
 
         final List<CompletableFuture<?>> listenersCallback = new 
ArrayList<>(2);
-        
listenersCallback.add(topicsChangedListener.onTopicsAdded(TopicList.minus(newTopics,
 oldTopics)));
-        
listenersCallback.add(topicsChangedListener.onTopicsRemoved(TopicList.minus(oldTopics,
 newTopics)));
+        Set<String> topicsAdded = TopicList.minus(newTopics, oldTopics);
+        Set<String> topicsRemoved = TopicList.minus(oldTopics, newTopics);
+        if (log.isDebugEnabled()) {
+            log.debug("Pattern consumer [{}] Recheck pattern consumer's 
topics. topicsAdded: {}, topicsRemoved: {}",
+                    subscriptionForLog, topicsAdded, topicsRemoved);
+        }
+        
listenersCallback.add(topicsChangedListener.onTopicsAdded(topicsAdded));
+        
listenersCallback.add(topicsChangedListener.onTopicsRemoved(topicsRemoved));
         return 
FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
     }
 
@@ -247,23 +238,68 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
          */
         @Override
         public CompletableFuture<Void> onTopicsRemoved(Collection<String> 
removedTopics) {
-            CompletableFuture<Void> removeFuture = new CompletableFuture<>();
-
             if (removedTopics.isEmpty()) {
-                removeFuture.complete(null);
-                return removeFuture;
+                return CompletableFuture.completedFuture(null);
             }
 
-            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(partitionedTopics.size());
-            removedTopics.stream().forEach(topic -> 
futures.add(removeConsumerAsync(topic)));
-            FutureUtil.waitForAll(futures)
-                .thenAccept(finalFuture -> removeFuture.complete(null))
-                .exceptionally(ex -> {
-                    log.warn("[{}] Failed to unsubscribe from topics: {}", 
topic, ex.getMessage());
-                    removeFuture.completeExceptionally(ex);
+            // Unsubscribe and remove consumers in memory.
+            List<CompletableFuture<Void>> unsubscribeList = new 
ArrayList<>(removedTopics.size());
+            Set<String> partialRemoved = new HashSet<>(removedTopics.size());
+            Set<String> partialRemovedForLog = new 
HashSet<>(removedTopics.size());
+            for (String tp : removedTopics) {
+                TopicName topicName = TopicName.get(tp);
+                ConsumerImpl<T> consumer = consumers.get(topicName.toString());
+                if (consumer != null) {
+                    CompletableFuture<Void> unsubscribeFuture = new 
CompletableFuture<>();
+                    consumer.closeAsync().whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            log.error("Pattern consumer [{}] failed to 
unsubscribe from topics: {}",
+                                    
PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString(), 
ex);
+                            unsubscribeFuture.completeExceptionally(ex);
+                        } else {
+                            consumers.remove(topicName.toString(), consumer);
+                            unsubscribeFuture.complete(null);
+                        }
+                    });
+                    unsubscribeList.add(unsubscribeFuture);
+                    partialRemoved.add(topicName.getPartitionedTopicName());
+                    partialRemovedForLog.add(topicName.toString());
+                }
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Pattern consumer [{}] remove topics. {}",
+                        PatternMultiTopicsConsumerImpl.this.getSubscription(),
+                        partialRemovedForLog);
+            }
+
+            // Remove partitioned topics in memory.
+            return FutureUtil.waitForAll(unsubscribeList).handle((__, ex) -> {
+                List<String> removedPartitionedTopicsForLog = new 
ArrayList<>();
+                for (String groupedTopicRemoved : partialRemoved) {
+                    Integer partitions = 
partitionedTopics.get(groupedTopicRemoved);
+                    if (partitions != null) {
+                        boolean allPartitionsHasBeenRemoved = true;
+                        for (int i = 0; i < partitions; i++) {
+                            if (consumers.containsKey(
+                                    
TopicName.get(groupedTopicRemoved).getPartition(i).toString())) {
+                                allPartitionsHasBeenRemoved = false;
+                                break;
+                            }
+                        }
+                        if (allPartitionsHasBeenRemoved) {
+                            
removedPartitionedTopicsForLog.add(String.format("%s with %s partitions",
+                                    groupedTopicRemoved, partitions));
+                            partitionedTopics.remove(groupedTopicRemoved, 
partitions);
+                        }
+                    }
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("Pattern consumer [{}] remove partitioned topics 
because all partitions have been"
+                                    + " removed. {}", 
PatternMultiTopicsConsumerImpl.this.getSubscription(),
+                            removedPartitionedTopicsForLog);
+                }
                 return null;
             });
-            return removeFuture;
         }
 
         /**
@@ -271,29 +307,90 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
          */
         @Override
         public CompletableFuture<Void> onTopicsAdded(Collection<String> 
addedTopics) {
-            CompletableFuture<Void> addFuture = new CompletableFuture<>();
-
             if (addedTopics.isEmpty()) {
-                addFuture.complete(null);
-                return addFuture;
+                return CompletableFuture.completedFuture(null);
             }
-
-            Set<String> addTopicPartitionedName = addedTopics.stream()
-                    .map(addTopicName -> 
TopicName.get(addTopicName).getPartitionedTopicName())
-                    .collect(Collectors.toSet());
-
-            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(partitionedTopics.size());
-            addTopicPartitionedName.forEach(partitionedTopic -> futures.add(
-                    subscribeAsync(partitionedTopic,
-                            false /* createTopicIfDoesNotExist */)));
-            FutureUtil.waitForAll(futures)
-                .thenAccept(finalFuture -> addFuture.complete(null))
-                .exceptionally(ex -> {
-                    log.warn("[{}] Failed to subscribe to topics: {}", topic, 
ex.getMessage());
-                    addFuture.completeExceptionally(ex);
-                    return null;
+            List<CompletableFuture<Void>> futures = 
Lists.newArrayListWithExpectedSize(addedTopics.size());
+            /**
+             * Three normal cases:
+             *  1. Expand partitions.
+             *  2. Non-partitioned topic, but has been subscribing.
+             *  3. Non-partitioned topic or Partitioned topic, but has not 
been subscribing.
+             * Two unexpected cases:
+             *   Error-1: Received adding non-partitioned topic event, but has 
subscribed a partitioned topic with the
+             *     same name.
+             *   Error-2: Received adding partitioned topic event, but has 
subscribed a non-partitioned topic with the
+             *     same name.
+             *
+             * Note: The events that triggered by {@link 
TopicsPartitionChangedListener} after expanding partitions has
+             *    been disabled through "conf.setAutoUpdatePartitions(false)" 
when creating
+             *    {@link PatternMultiTopicsConsumerImpl}.
+             */
+            Set<String> groupedTopics = new HashSet<>();
+            List<String> expendPartitionsForLog = new ArrayList<>();
+            for (String tp : addedTopics) {
+                TopicName topicName = TopicName.get(tp);
+                groupedTopics.add(topicName.getPartitionedTopicName());
+            }
+            for (String tp : addedTopics) {
+                TopicName topicName = TopicName.get(tp);
+                // Case 1: Expand partitions.
+                if 
(partitionedTopics.containsKey(topicName.getPartitionedTopicName())) {
+                    if (consumers.containsKey(topicName.toString())) {
+                        // Already subscribed.
+                    } else if (topicName.getPartitionIndex() < 0) {
+                        // Error-1: Received adding non-partitioned topic 
event, but has subscribed a partitioned topic
+                        // with the same name.
+                        log.error("Pattern consumer [{}] skip to subscribe to 
the non-partitioned topic {}, because has"
+                                + "subscribed a partitioned topic with the 
same name",
+                                
PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString());
+                    } else {
+                        if (topicName.getPartitionIndex() + 1
+                                > 
partitionedTopics.get(topicName.getPartitionedTopicName())) {
+                            
partitionedTopics.put(topicName.getPartitionedTopicName(),
+                                    topicName.getPartitionIndex() + 1);
+                        }
+                        expendPartitionsForLog.add(topicName.toString());
+                        CompletableFuture consumerFuture = 
subscribeAsync(topicName.toString(),
+                                PartitionedTopicMetadata.NON_PARTITIONED);
+                        consumerFuture.whenComplete((__, ex) -> {
+                            if (ex != null) {
+                                log.warn("Pattern consumer [{}] Failed to 
subscribe to topics: {}",
+                                        
PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName, ex);
+                            }
+                        });
+                        futures.add(consumerFuture);
+                    }
+                    groupedTopics.remove(topicName.getPartitionedTopicName());
+                } else if (consumers.containsKey(topicName.toString())) {
+                    // Case-2: Non-partitioned topic, but has been subscribing.
+                    groupedTopics.remove(topicName.getPartitionedTopicName());
+                } else if 
(consumers.containsKey(topicName.getPartitionedTopicName())
+                        && topicName.getPartitionIndex() >= 0) {
+                    // Error-2: Received adding partitioned topic event, but 
has subscribed a non-partitioned topic
+                    // with the same name.
+                    log.error("Pattern consumer [{}] skip to subscribe to the 
partitioned topic {}, because has"
+                                    + "subscribed a non-partitioned topic with 
the same name",
+                            
PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName);
+                    groupedTopics.remove(topicName.getPartitionedTopicName());
+                }
+            }
+            // Case 3: Non-partitioned topic or Partitioned topic, which has 
not been subscribed.
+            for (String partitionedTopic : groupedTopics) {
+                CompletableFuture consumerFuture = 
subscribeAsync(partitionedTopic, false);
+                consumerFuture.whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        log.warn("Pattern consumer [{}] Failed to subscribe to 
topics: {}",
+                                
PatternMultiTopicsConsumerImpl.this.getSubscription(), partitionedTopic, ex);
+                    }
                 });
-            return addFuture;
+                futures.add(consumerFuture);
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Pattern consumer [{}] add topics. expend partitions 
{}, new subscribing {}",
+                        PatternMultiTopicsConsumerImpl.this.getSubscription(), 
expendPartitionsForLog, groupedTopics);
+            }
+            return FutureUtil.waitForAll(futures);
         }
     }
 
@@ -313,7 +410,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                 closeFutures.add(watcher.closeAsync());
             }
         }
-        closeFutures.add(super.closeAsync());
+        
closeFutures.add(updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(__
 -> super.closeAsync()));
         return FutureUtil.waitForAll(closeFutures);
     }
 
@@ -322,5 +419,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         return recheckPatternTimeout;
     }
 
+    protected void handleSubscribeOneTopicError(String topicName,
+                                                Throwable error,
+                                                CompletableFuture<Void> 
subscribeFuture) {
+        subscribeFuture.completeExceptionally(error);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b93786bb4b1..64b6e45cb7f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -583,12 +583,13 @@ public class PulsarClientImpl implements PulsarClient {
         lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, 
null)
             .thenAccept(getTopicsResult -> {
                 if (log.isDebugEnabled()) {
-                    log.debug("Get topics under namespace {}, topics.size: {},"
-                                    + " topicsHash: {}, changed: {}, filtered: 
{}",
+                    log.debug("Pattern consumer [{}] get topics under 
namespace {}, topics.size: {},"
+                                    + " topicsHash: {}, changed: {}, filtered: 
{}", conf.getSubscriptionName(),
                             namespaceName, getTopicsResult.getTopics().size(), 
getTopicsResult.getTopicsHash(),
                             getTopicsResult.isChanged(), 
getTopicsResult.isFiltered());
                     getTopicsResult.getTopics().forEach(topicName ->
-                        log.debug("Get topics under namespace {}, topic: {}", 
namespaceName, topicName));
+                        log.debug("Pattern consumer [{}] get topics under 
namespace {}, topic: {}",
+                                conf.getSubscriptionName(), namespaceName, 
topicName));
                 }
 
                 List<String> topicsList = getTopicsResult.getTopics();
@@ -596,6 +597,14 @@ public class PulsarClientImpl implements PulsarClient {
                    topicsList = 
TopicList.filterTopics(getTopicsResult.getTopics(), conf.getTopicsPattern());
                 }
                 conf.getTopicNames().addAll(topicsList);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Pattern consumer [{}] initialize topics. {}", 
conf.getSubscriptionName(),
+                            
getTopicsResult.getNonPartitionedOrPartitionTopics());
+                }
+
+                // Pattern consumer has his unique check mechanism, so do not 
need the feature "autoUpdatePartitions".
+                conf.setAutoUpdatePartitions(false);
                 ConsumerBase<T> consumer = new 
PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(),
                         getTopicsResult.getTopicsHash(),
                         PulsarClientImpl.this,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 4e635e0d2e8..9cb737bd73d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -43,7 +43,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
             AtomicLongFieldUpdater
                     .newUpdater(TopicListWatcher.class, 
"createWatcherDeadline");
 
-    private final PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener;
+    private final PatternConsumerUpdateQueue patternConsumerUpdateQueue;
     private final String name;
     private final ConnectionHandler connectionHandler;
     private final Pattern topicsPattern;
@@ -63,13 +63,13 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
-    public 
TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener,
+    public TopicListWatcher(PatternConsumerUpdateQueue 
patternConsumerUpdateQueue,
                             PulsarClientImpl client, Pattern topicsPattern, 
long watcherId,
                             NamespaceName namespace, String topicsHash,
                             CompletableFuture<TopicListWatcher> watcherFuture,
                             Runnable recheckTopicsChangeAfterReconnect) {
         super(client, topicsPattern.pattern());
-        this.topicsChangeListener = topicsChangeListener;
+        this.patternConsumerUpdateQueue = patternConsumerUpdateQueue;
         this.name = "Watcher(" + topicsPattern + ")";
         this.connectionHandler = new ConnectionHandler(this,
                 new BackoffBuilder()
@@ -277,13 +277,7 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
     }
 
     public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) {
-        List<String> deleted = update.getDeletedTopicsList();
-        if (!deleted.isEmpty()) {
-            topicsChangeListener.onTopicsRemoved(deleted);
-        }
-        List<String> added = update.getNewTopicsList();
-        if (!added.isEmpty()) {
-            topicsChangeListener.onTopicsAdded(added);
-        }
+        
patternConsumerUpdateQueue.appendTopicsRemovedOp(update.getDeletedTopicsList());
+        
patternConsumerUpdateQueue.appendTopicsAddedOp(update.getNewTopicsList());
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java
new file mode 100644
index 00000000000..01f0be6a85e
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.util.HashedWheelTimer;
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Test(groups = "utils")
+public class PatternConsumerUpdateQueueTest {
+
+    private QueueInstance createInstance(CompletableFuture<Void> 
customizedRecheckFuture,
+                                         CompletableFuture<Void> 
customizedPartialUpdateFuture,
+                                         CompletableFuture<Void> 
customizedConsumerInitFuture) {
+        return createInstance(customizedRecheckFuture, 
customizedPartialUpdateFuture, customizedConsumerInitFuture,
+                null, null);
+    }
+
+    private QueueInstance createInstance(CompletableFuture<Void> 
customizedRecheckFuture,
+                                         CompletableFuture<Void> 
customizedPartialUpdateFuture,
+                                         CompletableFuture<Void> 
customizedConsumerInitFuture,
+                                         Collection<String> successTopics,
+                                         Collection<String> errorTopics) {
+        HashedWheelTimer timer = new HashedWheelTimer(new 
ExecutorProvider.ExtendedThreadFactory("timer-x",
+                Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        when(client.timer()).thenReturn(timer);
+
+        PatternMultiTopicsConsumerImpl patternConsumer = 
mock(PatternMultiTopicsConsumerImpl.class);
+        
when(patternConsumer.recheckTopicsChange()).thenReturn(customizedRecheckFuture);
+        when(patternConsumer.getClient()).thenReturn(client);
+        if (customizedConsumerInitFuture != null) {
+            
when(patternConsumer.getSubscribeFuture()).thenReturn(customizedConsumerInitFuture);
+        } else {
+            
when(patternConsumer.getSubscribeFuture()).thenReturn(CompletableFuture.completedFuture(null));
+        }
+
+        PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener =
+                
mock(PatternMultiTopicsConsumerImpl.TopicsChangedListener.class);
+        if (successTopics == null && errorTopics == null) {
+            
when(topicsChangeListener.onTopicsAdded(anyCollection())).thenReturn(customizedPartialUpdateFuture);
+            
when(topicsChangeListener.onTopicsRemoved(anyCollection())).thenReturn(customizedPartialUpdateFuture);
+        } else {
+            CompletableFuture<Void> ex = FutureUtil.failedFuture(new 
RuntimeException("mock error"));
+            
when(topicsChangeListener.onTopicsAdded(successTopics)).thenReturn(customizedPartialUpdateFuture);
+            
when(topicsChangeListener.onTopicsRemoved(successTopics)).thenReturn(customizedPartialUpdateFuture);
+            
when(topicsChangeListener.onTopicsAdded(errorTopics)).thenReturn(ex);
+            
when(topicsChangeListener.onTopicsRemoved(errorTopics)).thenReturn(ex);
+        }
+
+        PatternConsumerUpdateQueue queue = new 
PatternConsumerUpdateQueue(patternConsumer, topicsChangeListener);
+        return new QueueInstance(queue, patternConsumer, topicsChangeListener);
+    }
+
+    private QueueInstance createInstance() {
+        CompletableFuture<Void> completedFuture = 
CompletableFuture.completedFuture(null);
+        return createInstance(completedFuture, completedFuture, 
completedFuture);
+    }
+
+    @AllArgsConstructor
+    private static class QueueInstance implements Closeable {
+        private PatternConsumerUpdateQueue queue;
+        private PatternMultiTopicsConsumerImpl mockedConsumer;
+        private PatternMultiTopicsConsumerImpl.TopicsChangedListener 
mockedListener;
+
+        @Override
+        public void close() {
+            mockedConsumer.getClient().timer().stop();
+        }
+    }
+
+    @Test
+    public void testTopicsChangedEvents() {
+        QueueInstance instance = createInstance();
+
+        Collection<String> topics = Arrays.asList("a");
+        for (int i = 0; i < 10; i++) {
+            instance.queue.appendTopicsAddedOp(topics);
+            instance.queue.appendTopicsRemovedOp(topics);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            verify(instance.mockedListener, times(10)).onTopicsAdded(topics);
+            verify(instance.mockedListener, times(10)).onTopicsRemoved(topics);
+        });
+
+        // cleanup.
+        instance.close();
+    }
+
+    @Test
+    public void testRecheckTask() {
+        QueueInstance instance = createInstance();
+
+        for (int i = 0; i < 10; i++) {
+            instance.queue.appendRecheckOp();
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            verify(instance.mockedConsumer, times(10)).recheckTopicsChange();
+        });
+
+        // cleanup.
+        instance.close();
+    }
+
+    @Test
+    public void testDelayedRecheckTask() {
+        CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
+        CompletableFuture<Void> partialUpdateFuture = 
CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> consumerInitFuture = 
CompletableFuture.completedFuture(null);
+        QueueInstance instance = createInstance(recheckFuture, 
partialUpdateFuture, consumerInitFuture);
+
+        for (int i = 0; i < 10; i++) {
+            instance.queue.appendRecheckOp();
+        }
+
+        recheckFuture.complete(null);
+        Awaitility.await().untilAsserted(() -> {
+            // The first task will be running, and never completed until all 
tasks have been added.
+            // Since the first was started, the second one will not be skipped.
+            // The others after the second task will be skipped.
+            // So the times that called "recheckTopicsChange" will be 2.
+            verify(instance.mockedConsumer, times(2)).recheckTopicsChange();
+        });
+
+        // cleanup.
+        instance.close();
+    }
+
+    @Test
+    public void testCompositeTasks() {
+        CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
+        CompletableFuture<Void> partialUpdateFuture = 
CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> consumerInitFuture = 
CompletableFuture.completedFuture(null);
+        QueueInstance instance = createInstance(recheckFuture, 
partialUpdateFuture, consumerInitFuture);
+
+        Collection<String> topics = Arrays.asList("a");
+        for (int i = 0; i < 10; i++) {
+            instance.queue.appendRecheckOp();
+            instance.queue.appendTopicsAddedOp(topics);
+            instance.queue.appendTopicsRemovedOp(topics);
+        }
+        recheckFuture.complete(null);
+        Awaitility.await().untilAsserted(() -> {
+            // The first task will be running, and never completed until all 
tasks have been added.
+            // Since the first was started, the second one will not be skipped.
+            // The others after the second task will be skipped.
+            // So the times that called "recheckTopicsChange" will be 2.
+            verify(instance.mockedConsumer, times(2)).recheckTopicsChange();
+            // The tasks after the second "recheckTopicsChange" will be 
skipped due to there is a previous
+            //   "recheckTopicsChange" that has not been executed.
+            // The tasks between the fist "recheckTopicsChange" and the second 
"recheckTopicsChange" will be skipped
+            //   due to there is a following "recheckTopicsChange".
+            verify(instance.mockedListener, times(0)).onTopicsAdded(topics);
+            verify(instance.mockedListener, times(0)).onTopicsRemoved(topics);
+        });
+
+        // cleanup.
+        instance.close();
+    }
+
+    @Test
+    public void testErrorTask() {
+        CompletableFuture<Void> immediatelyCompleteFuture = 
CompletableFuture.completedFuture(null);
+        Collection<String> successTopics = Arrays.asList("a");
+        Collection<String> errorTopics = 
Arrays.asList(UUID.randomUUID().toString());
+        QueueInstance instance = createInstance(immediatelyCompleteFuture, 
immediatelyCompleteFuture,
+                immediatelyCompleteFuture, successTopics, errorTopics);
+
+        instance.queue.appendTopicsAddedOp(successTopics);
+        instance.queue.appendTopicsRemovedOp(successTopics);
+        instance.queue.appendTopicsAddedOp(errorTopics);
+        instance.queue.appendTopicsAddedOp(successTopics);
+        instance.queue.appendTopicsRemovedOp(successTopics);
+
+        Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
+            verify(instance.mockedListener, 
times(2)).onTopicsAdded(successTopics);
+            verify(instance.mockedListener, 
times(2)).onTopicsRemoved(successTopics);
+            verify(instance.mockedListener, 
times(1)).onTopicsAdded(errorTopics);
+            // After an error task will push a recheck task to offset.
+            verify(instance.mockedConsumer, times(1)).recheckTopicsChange();
+        });
+
+        // cleanup.
+        instance.close();
+    }
+
+    @Test
+    public void testFailedSubscribe() {
+        CompletableFuture<Void> immediatelyCompleteFuture = 
CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> consumerInitFuture = new CompletableFuture<>();
+        Collection<String> successTopics = Arrays.asList("a");
+        Collection<String> errorTopics = 
Arrays.asList(UUID.randomUUID().toString());
+        QueueInstance instance = createInstance(immediatelyCompleteFuture, 
immediatelyCompleteFuture,
+                consumerInitFuture, successTopics, errorTopics);
+
+        instance.queue.appendTopicsAddedOp(successTopics);
+        instance.queue.appendTopicsRemovedOp(successTopics);
+        instance.queue.appendTopicsAddedOp(errorTopics);
+        instance.queue.appendTopicsAddedOp(successTopics);
+        instance.queue.appendTopicsRemovedOp(successTopics);
+
+        // Consumer init failed after multi topics changes.
+        // All the topics changes events should be skipped.
+        consumerInitFuture.completeExceptionally(new RuntimeException("mocked 
ex"));
+        Awaitility.await().untilAsserted(() -> {
+            verify(instance.mockedListener, 
times(0)).onTopicsAdded(successTopics);
+            verify(instance.mockedListener, 
times(0)).onTopicsRemoved(successTopics);
+            verify(instance.mockedListener, 
times(0)).onTopicsAdded(errorTopics);
+            verify(instance.mockedConsumer, times(0)).recheckTopicsChange();
+        });
+
+        // cleanup.
+        instance.close();
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
index 5baca24cf8a..9c239785da7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
@@ -61,7 +61,7 @@ public class PatternMultiTopicsConsumerImplTest {
                         "persistent://tenant/my-ns/non-matching"),
                         null, false, true),
                 mockListener,
-                Collections.emptyList());
+                Collections.emptyList(), "");
         verify(mockListener).onTopicsAdded(Sets.newHashSet(
                 "persistent://tenant/my-ns/name-1",
                 "persistent://tenant/my-ns/name-2"));
@@ -80,7 +80,7 @@ public class PatternMultiTopicsConsumerImplTest {
                         "persistent://tenant/my-ns/name-2"),
                         "TOPICS_HASH", true, true),
                 mockListener,
-                Arrays.asList("persistent://tenant/my-ns/name-0"));
+                Arrays.asList("persistent://tenant/my-ns/name-0"), "");
         verify(mockListener).onTopicsAdded(Sets.newHashSet(
                 "persistent://tenant/my-ns/name-1",
                 "persistent://tenant/my-ns/name-2"));
@@ -99,7 +99,7 @@ public class PatternMultiTopicsConsumerImplTest {
                         "persistent://tenant/my-ns/name-2"),
                         "TOPICS_HASH", true, false),
                 mockListener,
-                Arrays.asList("persistent://tenant/my-ns/name-0"));
+                Arrays.asList("persistent://tenant/my-ns/name-0"), "");
         verify(mockListener, never()).onTopicsAdded(any());
         verify(mockListener, never()).onTopicsRemoved(any());
         verify(mockTopicsHashSetter).accept("TOPICS_HASH");
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 7e9fd601d4f..1d1a17d50eb 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -29,6 +29,7 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -68,8 +69,17 @@ public class TopicListWatcherTest {
                 thenReturn(clientCnxFuture.thenApply(clientCnx -> 
Pair.of(clientCnx, false)));
         when(client.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
         when(connectionPool.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
+
+        CompletableFuture<Void> completedFuture = 
CompletableFuture.completedFuture(null);
+        PatternMultiTopicsConsumerImpl patternConsumer = 
mock(PatternMultiTopicsConsumerImpl.class);
+        when(patternConsumer.getSubscribeFuture()).thenReturn(completedFuture);
+        
when(patternConsumer.recheckTopicsChange()).thenReturn(completedFuture);
+        
when(listener.onTopicsAdded(anyCollection())).thenReturn(completedFuture);
+        
when(listener.onTopicsRemoved(anyCollection())).thenReturn(completedFuture);
+        PatternConsumerUpdateQueue queue = new 
PatternConsumerUpdateQueue(patternConsumer, listener);
+
         watcherFuture = new CompletableFuture<>();
-        watcher = new TopicListWatcher(listener, client,
+        watcher = new TopicListWatcher(queue, client,
                 Pattern.compile(topic), 7,
                 NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {});
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
index 80f16e6c367..d36595d91ef 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.common.lookup;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
 import lombok.Getter;
 import lombok.ToString;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -119,4 +122,25 @@ public class GetTopicsResult {
             return topics;
         }
     }
+
+    public GetTopicsResult filterTopics(Pattern topicsPattern) {
+        List<String> topicsFiltered = TopicList.filterTopics(getTopics(), 
topicsPattern);
+        // If nothing changed.
+        if (topicsFiltered.equals(getTopics())) {
+            GetTopicsResult newObj = new 
GetTopicsResult(nonPartitionedOrPartitionTopics, null, true, true);
+            newObj.topics = topics;
+            return newObj;
+        }
+        // Filtered some topics.
+        Set<String> topicsFilteredSet = new HashSet<>(topicsFiltered);
+        List<String> newTps = new ArrayList<>();
+        for (String tp: nonPartitionedOrPartitionTopics) {
+            if 
(topicsFilteredSet.contains(TopicName.get(tp).getPartitionedTopicName())) {
+                newTps.add(tp);
+            }
+        }
+        GetTopicsResult newObj = new GetTopicsResult(newTps, null, true, true);
+        newObj.topics = topicsFiltered;
+        return newObj;
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index e051e01495d..2298a8d06a8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.Codec;
 
@@ -102,6 +103,14 @@ public class TopicName implements ServiceUnitId {
         }
     }
 
+    public static String getPartitionPattern(String topic) {
+        return "^" + 
Pattern.quote(get(topic).getPartitionedTopicName().toString()) + 
"-partition-[0-9]+$";
+    }
+
+    public static String getPattern(String topic) {
+        return "^" + 
Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$";
+    }
+
     @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
     private TopicName(String completeTopicName) {
         try {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 4c0a8d500b7..a66d327dbab 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -28,7 +28,9 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicType;
 
 @UtilityClass
 public class TopicList {
@@ -86,9 +88,18 @@ public class TopicList {
         if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
             return originalRegexp;
         }
-        String removedTopicDomain = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
+        String[] parts = 
SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
+        String prefix = parts[0];
+        String removedTopicDomain = parts[1];
+        if (prefix.equals(TopicDomain.persistent.value()) || 
prefix.equals(TopicDomain.non_persistent.value())) {
+            prefix = "";
+        } else if (prefix.endsWith(TopicDomain.persistent.value())){
+            prefix = prefix.substring(0, prefix.length() - 
TopicDomain.persistent.value().length());
+        } else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
+            prefix = prefix.substring(0, prefix.length() - 
TopicDomain.non_persistent.value().length());
+        }
         if (originalRegexp.contains("^")) {
-            return String.format("^%s", removedTopicDomain);
+            return String.format("%s%s", prefix, removedTopicDomain);
         } else {
             return removedTopicDomain;
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 0628d494af3..454eee0f966 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -54,6 +54,9 @@ public class FutureUtil {
      * @return a new CompletableFuture that is completed when all of the given 
CompletableFutures complete
      */
     public static CompletableFuture<Void> waitForAll(Collection<? extends 
CompletableFuture<?>> futures) {
+        if (futures == null || futures.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
         return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0]));
     }
 

Reply via email to