This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d073a7c  PIP-13-1/3: Provide `TopicsConsumer` to consume from several 
topics under same namespace (#1103)
d073a7c is described below

commit d073a7c874c391731d703f86ce13849ee7312f0e
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Feb 21 14:15:40 2018 -0800

    PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under 
same namespace (#1103)
    
    * add TopicsConsumerImpl and some test
    
    * fix license header check, add get method, and fix unit-test fail
    
    * add subscribe and unsubscribe for one topic, add tests
    
    * change to use subscribeAsync(topic) to subscribe topics in init
    
    * change following comments
    
    * rebase master, fix compile issue
---
 .../broker/service/PersistentQueueE2ETest.java     |   4 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 522 +++++++++++++
 .../org/apache/pulsar/client/api/PulsarClient.java |  57 ++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  13 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  11 +-
 .../client/impl/PartitionedConsumerImpl.java       |  15 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  65 ++
 .../pulsar/client/impl/TopicMessageIdImpl.java     |  49 ++
 .../pulsar/client/impl/TopicMessageImpl.java       | 101 +++
 .../pulsar/client/impl/TopicsConsumerImpl.java     | 823 +++++++++++++++++++++
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  44 +-
 .../client/impl/UnAckedTopicMessageTracker.java    |  49 ++
 12 files changed, 1712 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 7a65cbc..f311f4c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -539,11 +539,11 @@ public class PersistentQueueE2ETest extends 
BrokerTestBase {
             producer.send(("hello-" + i).getBytes());
         }
 
-        Set<MessageIdImpl> c1_receivedMessages = new HashSet<>();
+        Set<MessageId> c1_receivedMessages = new HashSet<>();
 
         // C-1 gets all messages but doesn't ack
         for (int i = 0; i < numMsgs; i++) {
-            c1_receivedMessages.add((MessageIdImpl) 
consumer1.receive().getMessageId());
+            c1_receivedMessages.add(consumer1.receive().getMessageId());
         }
 
         // C-2 will not get any message initially, since everything went to 
C-1 already
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
new file mode 100644
index 0000000..a2cb6d5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicsConsumerImplTest extends ProducerConsumerBase {
+    private static final long testTimeout = 90000; // 1.5 min
+    private static final Logger log = 
LoggerFactory.getLogger(TopicsConsumerImplTest.class);
+    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @Override
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    // Verify subscribe topics from different namespace should return error.
+    @Test(timeOut = testTimeout)
+    public void testDifferentTopicsNameSubscribe() throws Exception {
+        String key = "TopicsFromDifferentNamespace";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" + 
key;
+        final String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        try {
+            Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+            fail("subscribe for topics from different namespace should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected for have different namespace
+        }
+    }
+
+
+    @Test(timeOut = testTimeout)
+    public void testGetConsumersAndGetTopics() throws Exception {
+        String key = "TopicsConsumerGet";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        List<String> topics = ((TopicsConsumerImpl) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((TopicsConsumerImpl) 
consumer).getConsumers();
+
+        topics.forEach(topic -> log.info("topic: {}", topic));
+        consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
+
+        IntStream.range(0, 6).forEach(index ->
+            
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
+
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testSyncProducerAndConsumer() throws Exception {
+        String key = "TopicsConsumerSyncTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
+        
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testAsyncConsumer() throws Exception {
+        String key = "TopicsConsumerAsyncTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
+        
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // Asynchronously produce messages
+        List<Future<MessageId>> futures = Lists.newArrayList();
+        for (int i = 0; i < totalMessages / 3; i++) {
+            futures.add(producer1.sendAsync((messagePredicate + "producer1-" + 
i).getBytes()));
+            futures.add(producer2.sendAsync((messagePredicate + "producer2-" + 
i).getBytes()));
+            futures.add(producer3.sendAsync((messagePredicate + "producer3-" + 
i).getBytes()));
+        }
+        log.info("Waiting for async publish to complete : {}", futures.size());
+        for (Future<MessageId> future : futures) {
+            future.get();
+        }
+
+        log.info("start async consume");
+        CountDownLatch latch = new CountDownLatch(totalMessages);
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        executor.execute(() -> IntStream.range(0, totalMessages).forEach(index 
->
+            consumer.receiveAsync()
+                .thenAccept(msg -> {
+                    assertTrue(msg instanceof TopicMessageImpl);
+                    try {
+                        consumer.acknowledge(msg);
+                    } catch (PulsarClientException e1) {
+                        fail("message acknowledge failed", e1);
+                    }
+                    latch.countDown();
+                    log.info("receive index: {}, latch countDown: {}", index, 
latch.getCount());
+                })
+                .exceptionally(ex -> {
+                    log.warn("receive index: {}, failed receive message {}", 
index, ex.getMessage());
+                    ex.printStackTrace();
+                    return null;
+                })));
+
+        latch.await();
+        log.info("success latch wait");
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testConsumerUnackedRedelivery() throws Exception {
+        String key = "TopicsConsumerRedeliveryTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
+        
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        // 4. Receiver receives the message, not ack, Unacked Message Tracker 
size should be totalMessages.
+        Message message = consumer.receive();
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            log.debug("Consumer received : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        }
+        long size = ((TopicsConsumerImpl) 
consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, totalMessages);
+
+        // 5. Blocking call, redeliver should kick in, after receive and ack, 
Unacked Message Tracker size should be 0.
+        message = consumer.receive();
+        HashSet<String> hSet = new HashSet<>();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            hSet.add(new String(message.getData()));
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+
+        size = ((TopicsConsumerImpl) 
consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+        assertEquals(hSet.size(), totalMessages);
+
+        // 6. producer publish more messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round2" + 
i).getBytes());
+            producer2.send((messagePredicate + "producer2-round2" + 
i).getBytes());
+            producer3.send((messagePredicate + "producer3-round2" + 
i).getBytes());
+        }
+
+        // 7. Receiver receives the message, ack them
+        message = consumer.receive();
+        int received = 0;
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            received++;
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            consumer.acknowledge(message);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        size = ((TopicsConsumerImpl) 
consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+        assertEquals(received, totalMessages);
+
+        // 8. Simulate ackTimeout
+        ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().toggle();
+        ((TopicsConsumerImpl) consumer).getConsumers().forEach(c -> 
c.getUnAckedMessageTracker().toggle());
+
+        // 9. producer publish more messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round3" + 
i).getBytes());
+            producer2.send((messagePredicate + "producer2-round3" + 
i).getBytes());
+            producer3.send((messagePredicate + "producer3-round3" + 
i).getBytes());
+        }
+
+        // 10. Receiver receives the message, doesn't ack
+        message = consumer.receive();
+        while (message != null) {
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        size = ((TopicsConsumerImpl) 
consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 30);
+
+        Thread.sleep(ackTimeOutMillis);
+
+        // 11. Receiver receives redelivered messages
+        message = consumer.receive();
+        int redelivered = 0;
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            redelivered++;
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            consumer.acknowledge(message);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        assertEquals(redelivered, 30);
+        size =  ((TopicsConsumerImpl) 
consumer).getUnAckedMessageTracker().size();
+        log.info(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test
+    public void testSubscribeUnsubscribeSingleTopic() throws Exception {
+        String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, 
topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new 
ProducerConfiguration();
+        
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, 
producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, 
producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, 
subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 4, unsubscribe topic3
+        CompletableFuture<Void> unsubFuture = 
((TopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
+        unsubFuture.get();
+
+        // 5. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round2" + 
i).getBytes());
+            producer2.send((messagePredicate + "producer2-round2" + 
i).getBytes());
+            producer3.send((messagePredicate + "producer3-round2" + 
i).getBytes());
+        }
+
+        // 6. should not receive messages from topic3, verify get 2/3 of all 
messages
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages * 2 / 3);
+
+        // 7. use getter to verify internal topics number after un-subscribe 
topic3
+        List<String> topics = ((TopicsConsumerImpl) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((TopicsConsumerImpl) 
consumer).getConsumers();
+
+        assertEquals(topics.size(), 3);
+        assertEquals(consumers.size(), 3);
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 2);
+
+        // 8. re-subscribe topic3
+        CompletableFuture<Void> subFuture = 
((TopicsConsumerImpl)consumer).subscribeAsync(topicName3);
+        subFuture.get();
+
+        // 9. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round3" + 
i).getBytes());
+            producer2.send((messagePredicate + "producer2-round3" + 
i).getBytes());
+            producer3.send((messagePredicate + "producer3-round3" + 
i).getBytes());
+        }
+
+        // 10. should receive messages from all 3 topics
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 11. use getter to verify internal topics number after subscribe 
topic3
+        topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 6);
+        assertEquals(consumers.size(), 6);
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 6ba2518..8de76f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -320,4 +321,60 @@ public interface PulsarClient extends Closeable {
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
+
+
+    /**
+     * Subscribe to the given topic and subscription combination with default 
{@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same 
namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     */
+    Consumer subscribe(Collection<String> topics, String subscription) throws 
PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topics and subscription 
combination with
+     * default {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same 
namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, 
String subscription);
+
+    /**
+     * Subscribe to the given topics and subscription combination using given 
{@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same 
namespace
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    Consumer subscribe(Collection<String> topics, String subscription, 
ConsumerConfiguration conf)
+        throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topics and subscription 
combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same 
namespace
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
+                                               String subscription,
+                                               ConsumerConfiguration conf);
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1886c76..ef7ab8b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.collect.Queues;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -27,7 +28,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -42,8 +42,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
-import com.google.common.collect.Queues;
-
 public abstract class ConsumerBase extends HandlerBase implements Consumer {
 
     enum ConsumerType {
@@ -59,7 +57,7 @@ public abstract class ConsumerBase extends HandlerBase 
implements Consumer {
     protected final ExecutorService listenerExecutor;
     final BlockingQueue<Message> incomingMessages;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message>> 
pendingReceives;
-    protected final int maxReceiverQueueSize;
+    protected int maxReceiverQueueSize;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, String 
subscription, ConsumerConfiguration conf,
             int receiverQueueSize, ExecutorService listenerExecutor, 
CompletableFuture<Consumer> subscribeFuture) {
@@ -333,7 +331,7 @@ public abstract class ConsumerBase extends HandlerBase 
implements Consumer {
      * the connected consumers. This is a non blocking call and doesn't throw 
an exception. In case the connection
      * breaks, the messages are redelivered after reconnect.
      */
-    protected abstract void redeliverUnacknowledgedMessages(Set<MessageIdImpl> 
messageIds);
+    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> 
messageIds);
 
     @Override
     public String toString() {
@@ -343,4 +341,9 @@ public abstract class ConsumerBase extends HandlerBase 
implements Consumer {
                 ", topic='" + topic + '\'' +
                 '}';
     }
+
+    protected void setMaxReceiverQueueSize(int newSize) {
+        this.maxReceiverQueueSize = newSize;
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 219479f..93d0d87 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1188,7 +1188,9 @@ public class ConsumerImpl extends ConsumerBase {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) 
{
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof 
MessageIdImpl);
+
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
             // We cannot redeliver single messages if subscription type is not 
Shared
             redeliverUnacknowledgedMessages();
@@ -1197,7 +1199,10 @@ public class ConsumerImpl extends ConsumerBase {
         ClientCnx cnx = cnx();
         if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v2.getNumber()) {
             int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
-            Iterable<List<MessageIdImpl>> batches = 
Iterables.partition(messageIds, MAX_REDELIVER_UNACKNOWLEDGED);
+            Iterable<List<MessageIdImpl>> batches = Iterables.partition(
+                messageIds.stream()
+                    .map(messageId -> (MessageIdImpl)messageId)
+                    .collect(Collectors.toSet()), 
MAX_REDELIVER_UNACKNOWLEDGED);
             MessageIdData.Builder builder = MessageIdData.newBuilder();
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = 
ids.stream().map(messageId -> {
@@ -1374,7 +1379,7 @@ public class ConsumerImpl extends ConsumerBase {
         return messageId;
     }
 
-    private int removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+    private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         int messagesFromQueue = 0;
         Message peek = incomingMessages.peek();
         if (peek != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 92cbea2..332dd56 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -470,7 +470,8 @@ public class PartitionedConsumerImpl extends ConsumerBase {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) 
{
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof 
MessageIdImpl);
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
             // We cannot redeliver single messages if subscription type is not 
Shared
             redeliverUnacknowledgedMessages();
@@ -478,9 +479,11 @@ public class PartitionedConsumerImpl extends ConsumerBase {
         }
         removeExpiredMessagesFromQueue(messageIds);
         messageIds.stream()
-                
.collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, 
Collectors.toSet()))
-                .forEach((partitionIndex, messageIds1) ->
-                        
consumers.get(partitionIndex).redeliverUnacknowledgedMessages(messageIds1));
+            .map(messageId -> (MessageIdImpl)messageId)
+            .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, 
Collectors.toSet()))
+            .forEach((partitionIndex, messageIds1) ->
+                consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
+                    messageIds1.stream().map(mid -> 
(MessageId)mid).collect(Collectors.toSet())));
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
@@ -548,10 +551,10 @@ public class PartitionedConsumerImpl extends ConsumerBase 
{
         return unAckedMessageTracker;
     }
 
-    private void removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) 
{
+    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         Message peek = incomingMessages.peek();
         if (peek != null) {
-            if (!messageIds.contains((MessageIdImpl) peek.getMessageId())) {
+            if (!messageIds.contains(peek.getMessageId())) {
                 // first message is not expired, then no message is expired in 
queue.
                 return;
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6e1c5af..5d7aab8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -307,6 +308,70 @@ public class PulsarClientImpl implements PulsarClient {
         return consumerSubscribedFuture;
     }
 
+
+    @Override
+    public Consumer subscribe(Collection<String> topics, final String 
subscription) throws PulsarClientException {
+        return subscribe(topics, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public Consumer subscribe(Collection<String> topics,
+                              String subscription,
+                              ConsumerConfiguration conf)
+        throws PulsarClientException {
+        try {
+            return subscribeAsync(topics, subscription, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Collection<String> 
topics, String subscription) {
+        return subscribeAsync(topics, subscription, new 
ConsumerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Collection<String> 
topics,
+                                                      String subscription,
+                                                      ConsumerConfiguration 
conf) {
+        if (topics == null || topics.isEmpty()) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.InvalidTopicNameException("Empty topics name"));
+        }
+
+        if (state.get() != State.Open) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
+        }
+
+        if (isBlank(subscription)) {
+            return FutureUtil
+                .failedFuture(new 
PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+        }
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                new 
PulsarClientException.InvalidConfigurationException("Consumer configuration 
undefined"));
+        }
+
+        CompletableFuture<Consumer> consumerSubscribedFuture = new 
CompletableFuture<>();
+
+        ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, 
topics, subscription,
+            conf, externalExecutorProvider.getExecutor(),
+            consumerSubscribedFuture);
+        synchronized (consumers) {
+            consumers.put(consumer, Boolean.TRUE);
+        }
+
+        return consumerSubscribedFuture;
+    }
+
     @Override
     public Reader createReader(String topic, MessageId startMessageId, 
ReaderConfiguration conf)
             throws PulsarClientException {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
new file mode 100644
index 0000000..c7cc453
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
+    private final String topicName;
+    private final MessageId messageId;
+
+    TopicMessageIdImpl(String topicName, MessageId messageId) {
+        this.topicName = topicName;
+        this.messageId = messageId;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public MessageId getInnerMessageId() {
+        return messageId;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        return messageId.toByteArray();
+    }
+
+    @Override
+    public int compareTo(MessageId o) {
+        return messageId.compareTo(o);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
new file mode 100644
index 0000000..619d15c
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
+
+    private final String topicName;
+    private final Message msg;
+    private final MessageId msgId;
+
+    TopicMessageImpl(String topicName,
+                     Message msg) {
+        this.topicName = topicName;
+        this.msg = msg;
+        this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+    }
+
+    /**
+     * Get the topic name of this message.
+     * @return the name of the topic on which this message was published
+     */
+    public String getTopicName() {
+        return topicName;
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        return msgId;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return msg.getProperties();
+    }
+
+    @Override
+    public boolean hasProperty(String name) {
+        return msg.hasProperty(name);
+    }
+
+    @Override
+    public String getProperty(String name) {
+        return msg.getProperty(name);
+    }
+
+    @Override
+    public byte[] getData() {
+        return msg.getData();
+    }
+
+    @Override
+    public long getPublishTime() {
+        return msg.getPublishTime();
+    }
+
+    @Override
+    public long getEventTime() {
+        return msg.getEventTime();
+    }
+
+    @Override
+    public long getSequenceId() {
+        return msg.getSequenceId();
+    }
+
+    @Override
+    public String getProducerName() {
+        return msg.getProducerName();
+    }
+
+    @Override
+    public boolean hasKey() {
+        return msg.hasKey();
+    }
+
+    @Override
+    public String getKey() {
+        return msg.getKey();
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
new file mode 100644
index 0000000..852c5d2
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+    // All topics should be in same namespace
+    protected NamespaceName namespaceName;
+
+    // Map <topic+partition, consumer>, when get do ACK, consumer will by find 
by topic name
+    private final ConcurrentHashMap<String, ConsumerImpl> consumers;
+
+    // Map <topic, partitionNumber>, store partition number for each topic
+    private final ConcurrentHashMap<String, Integer> topics;
+
+    // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+    // shared incoming queue was full
+    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+
+    // Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+    // resume receiving from the paused consumer partitions
+    private final int sharedQueueResumeThreshold;
+
+    // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+    AtomicInteger numberTopicPartitions;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ConsumerStats stats;
+    private final UnAckedMessageTracker unAckedMessageTracker;
+    private final ConsumerConfiguration internalConfig;
+
+    TopicsConsumerImpl(PulsarClientImpl client, Collection<String> topics, 
String subscription,
+                       ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+                       CompletableFuture<Consumer> subscribeFuture) {
+        super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+            conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+            subscribeFuture);
+
+        checkArgument(conf.getReceiverQueueSize() > 0,
+            "Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+        this.topics = new ConcurrentHashMap<>();
+        this.consumers = new ConcurrentHashMap<>();
+        this.pausedConsumers = new ConcurrentLinkedQueue<>();
+        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+        this.numberTopicPartitions = new AtomicInteger(0);
+
+        if (conf.getAckTimeoutMillis() != 0) {
+            this.unAckedMessageTracker = new 
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+        } else {
+            this.unAckedMessageTracker = 
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
+        }
+
+        this.internalConfig = getInternalConsumerConfig();
+        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? 
new ConsumerStats() : null;
+
+        if (topics.isEmpty()) {
+            this.namespaceName = null;
+            setState(State.Ready);
+            subscribeFuture().complete(TopicsConsumerImpl.this);
+            return;
+        }
+
+        checkArgument(topics.isEmpty() || topicNamesValid(topics), "Topics 
should have same namespace.");
+        this.namespaceName = topics.stream().findFirst().flatMap(
+            new Function<String, Optional<NamespaceName>>() {
+                @Override
+                public Optional<NamespaceName> apply(String s) {
+                    return 
Optional.of(DestinationName.get(s).getNamespaceObject());
+                }
+            }).get();
+
+        List<CompletableFuture<Void>> futures = topics.stream().map(t -> 
subscribeAsync(t)).collect(Collectors.toList());
+        FutureUtil.waitForAll(futures)
+            .thenAccept(finalFuture -> {
+                try {
+                    if (numberTopicPartitions.get() > maxReceiverQueueSize) {
+                        setMaxReceiverQueueSize(numberTopicPartitions.get());
+                    }
+                    setState(State.Ready);
+                    // We have successfully created N consumers, so we can 
start receiving messages now
+                    
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+                    subscribeFuture().complete(TopicsConsumerImpl.this);
+                    log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
+                        topic, subscription, numberTopicPartitions.get());
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] Failed startReceivingMessages while 
subscribe topics: {}", topic, e.getMessage());
+                    subscribeFuture.completeExceptionally(e);
+                }})
+            .exceptionally(ex -> {
+                log.warn("[{}] Failed to subscribe topics: {}", topic, 
ex.getMessage());
+                subscribeFuture.completeExceptionally(ex);
+                return null;
+            });
+    }
+
+    // Check topics are valid.
+    // - each topic is valid,
+    // - every topic has same namespace,
+    // - topic names are unique.
+    private static boolean topicNamesValid(Collection<String> topics) {
+        checkState(topics != null && topics.size() > 1,
+            "topics should should contain more than 1 topics");
+
+        final String namespace = 
DestinationName.get(topics.stream().findFirst().get()).getNamespace();
+
+        Optional<String> result = topics.stream()
+            .filter(topic -> {
+                boolean topicInvalid = !DestinationName.isValid(topic);
+                if (topicInvalid) {
+                    return true;
+                }
+
+                String newNamespace =  
DestinationName.get(topic).getNamespace();
+                if (!namespace.equals(newNamespace)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).findFirst();
+
+        if (result.isPresent()) {
+            log.warn("[{}] Received invalid topic name.  {}/{}", result.get());
+            return false;
+        }
+
+        // check topic names are unique
+        HashSet<String> set = new HashSet<>(topics);
+        if (set.size() == topics.size()) {
+            return true;
+        } else {
+            log.warn("Topic names not unique. unique/all : {}/{}", set.size(), 
topics.size());
+            return false;
+        }
+    }
+
+    private void startReceivingMessages(List<ConsumerImpl> newConsumers) 
throws PulsarClientException {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] startReceivingMessages for {} new consumers in 
topics consumer, state: {}",
+                topic, newConsumers.size(), getState());
+        }
+        if (getState() == State.Ready) {
+            newConsumers.forEach(consumer -> {
+                consumer.sendFlowPermitsToBroker(consumer.cnx(), 
conf.getReceiverQueueSize());
+                receiveMessageFromConsumer(consumer);
+            });
+        }
+    }
+
+    private void receiveMessageFromConsumer(ConsumerImpl consumer) {
+        consumer.receiveAsync().thenAccept(message -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Receive message from sub consumer:{}",
+                    topic, subscription, consumer.getTopic());
+            }
+            // Process the message, add to the queue and trigger listener or 
async callback
+            messageReceived(consumer, message);
+
+            // we're modifying pausedConsumers
+            lock.writeLock().lock();
+            try {
+                int size = incomingMessages.size();
+                if (size >= maxReceiverQueueSize
+                        || (size > sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty())) {
+                    // mark this consumer to be resumed later: if No more 
space left in shared queue,
+                    // or if any consumer is already paused (to create fair 
chance for already paused consumers)
+                    pausedConsumers.add(consumer);
+                } else {
+                    // Schedule next receiveAsync() if the incoming queue is 
not full. Use a different thread to avoid
+                    // recursion and stack overflow
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
+    }
+
+    private void messageReceived(ConsumerImpl consumer, Message message) {
+        checkArgument(message instanceof MessageImpl);
+        lock.writeLock().lock();
+        try {
+            TopicMessageImpl topicMessage = new 
TopicMessageImpl(consumer.getTopic(), message);
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Received message from topics-consumer {}",
+                    topic, subscription, message.getMessageId());
+            }
+
+            // if asyncReceive is waiting : return message to callback without 
adding to incomingMessages queue
+            if (!pendingReceives.isEmpty()) {
+                CompletableFuture<Message> receivedFuture = 
pendingReceives.poll();
+                listenerExecutor.execute(() -> 
receivedFuture.complete(topicMessage));
+            } else {
+                // Enqueue the message so that it can be retrieved when 
application calls receive()
+                // Waits for the queue to have space for the message
+                // This should never block cause TopicsConsumerImpl should 
always use GrowableArrayBlockingQueue
+                incomingMessages.put(topicMessage);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (listener != null) {
+            // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
+            // thread while the message processing happens
+            listenerExecutor.execute(() -> {
+                Message msg;
+                try {
+                    msg = internalReceive();
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
+                    return;
+                }
+
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Calling message listener for 
message {}",
+                            topic, subscription, message.getMessageId());
+                    }
+                    listener.received(TopicsConsumerImpl.this, msg);
+                } catch (Throwable t) {
+                    log.error("[{}][{}] Message listener error in processing 
message: {}",
+                        topic, subscription, message, t);
+                }
+            });
+        }
+    }
+
+    private void resumeReceivingFromPausedConsumersIfNeeded() {
+        lock.readLock().lock();
+        try {
+            if (incomingMessages.size() <= sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty()) {
+                while (true) {
+                    ConsumerImpl consumer = pausedConsumers.poll();
+                    if (consumer == null) {
+                        break;
+                    }
+
+                    // if messages are readily available on consumer we will 
attempt to writeLock on the same thread
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    protected Message internalReceive() throws PulsarClientException {
+        Message message;
+        try {
+            message = incomingMessages.take();
+            checkState(message instanceof TopicMessageImpl);
+            unAckedMessageTracker.add(message.getMessageId());
+            resumeReceivingFromPausedConsumersIfNeeded();
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    protected Message internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
+        Message message;
+        try {
+            message = incomingMessages.poll(timeout, unit);
+            if (message != null) {
+                checkArgument(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+            }
+            resumeReceivingFromPausedConsumersIfNeeded();
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Message> internalReceiveAsync() {
+        CompletableFuture<Message> result = new CompletableFuture<>();
+        Message message;
+        try {
+            lock.writeLock().lock();
+            message = incomingMessages.poll(0, TimeUnit.SECONDS);
+            if (message == null) {
+                pendingReceives.add(result);
+            } else {
+                checkState(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+                resumeReceivingFromPausedConsumersIfNeeded();
+                result.complete(message);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            result.completeExceptionally(new PulsarClientException(e));
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        return result;
+    }
+
+    @Override
+    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
+                                                    Map<String,Long> 
properties) {
+        checkArgument(messageId instanceof TopicMessageIdImpl);
+        TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+
+        if (getState() != State.Ready) {
+            return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
+        }
+
+        if (ackType == AckType.Cumulative) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.NotSupportedException(
+                    "Cumulative acknowledge not supported for topics 
consumer"));
+        } else {
+            ConsumerImpl consumer = consumers.get(messageId1.getTopicName());
+
+            MessageId innerId = messageId1.getInnerMessageId();
+            return consumer.doAcknowledge(innerId, ackType, properties)
+                .thenRun(() ->
+                    unAckedMessageTracker.remove(messageId1));
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> unsubscribeAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
+        }
+        setState(State.Closing);
+
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> futureList = consumers.values().stream()
+            .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    setState(State.Closed);
+                    unAckedMessageTracker.close();
+                    unsubscribeFuture.complete(null);
+                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
+                        topic, subscription, consumerName);
+                } else {
+                    setState(State.Failed);
+                    unsubscribeFuture.completeExceptionally(ex);
+                    log.error("[{}] [{}] [{}] Could not unsubscribe Topics 
Consumer",
+                        topic, subscription, consumerName, ex.getCause());
+                }
+            });
+
+        return unsubscribeFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            unAckedMessageTracker.close();
+            return CompletableFuture.completedFuture(null);
+        }
+        setState(State.Closing);
+
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> futureList = consumers.values().stream()
+            .map(c -> c.closeAsync()).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    setState(State.Closed);
+                    unAckedMessageTracker.close();
+                    closeFuture.complete(null);
+                    log.info("[{}] [{}] Closed Topics Consumer", topic, 
subscription);
+                    client.cleanupConsumer(this);
+                    // fail all pending-receive futures to notify application
+                    failPendingReceive();
+                } else {
+                    setState(State.Failed);
+                    closeFuture.completeExceptionally(ex);
+                    log.error("[{}] [{}] Could not close Topics Consumer", 
topic, subscription,
+                        ex.getCause());
+                }
+            });
+
+        return closeFuture;
+    }
+
+    private void failPendingReceive() {
+        lock.readLock().lock();
+        try {
+            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+                while (!pendingReceives.isEmpty()) {
+                    CompletableFuture<Message> receiveFuture = 
pendingReceives.poll();
+                    if (receiveFuture != null) {
+                        receiveFuture.completeExceptionally(
+                                new 
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
+                    } else {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return consumers.values().stream().allMatch(consumer -> 
consumer.isConnected());
+    }
+
+    @Override
+    void connectionFailed(PulsarClientException exception) {
+        // noop
+
+    }
+
+    @Override
+    void connectionOpened(ClientCnx cnx) {
+        // noop
+
+    }
+
+    @Override
+    String getHandlerName() {
+        return subscription;
+    }
+
+    private ConsumerConfiguration getInternalConsumerConfig() {
+        ConsumerConfiguration internalConsumerConfig = new 
ConsumerConfiguration();
+        
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
+        internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
+        internalConsumerConfig.setConsumerName(consumerName);
+        if (conf.getCryptoKeyReader() != null) {
+            
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
+            
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
+        }
+        if (conf.getAckTimeoutMillis() != 0) {
+            internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), 
TimeUnit.MILLISECONDS);
+        }
+
+        return internalConsumerConfig;
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages() {
+        synchronized (this) {
+            consumers.values().stream().forEach(consumer -> 
consumer.redeliverUnacknowledgedMessages());
+            incomingMessages.clear();
+            unAckedMessageTracker.clear();
+            resumeReceivingFromPausedConsumersIfNeeded();
+        }
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof 
TopicMessageIdImpl);
+
+        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+            // We cannot redeliver single messages if subscription type is not 
Shared
+            redeliverUnacknowledgedMessages();
+            return;
+        }
+        removeExpiredMessagesFromQueue(messageIds);
+        messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
+            .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, 
Collectors.toSet()))
+            .forEach((topicName, messageIds1) ->
+                consumers.get(topicName)
+                    .redeliverUnacknowledgedMessages(messageIds1.stream()
+                        .map(mid -> 
mid.getInnerMessageId()).collect(Collectors.toSet())));
+        resumeReceivingFromPausedConsumersIfNeeded();
+    }
+
+    @Override
+    public void seek(MessageId messageId) throws PulsarClientException {
+        try {
+            seekAsync(messageId).get();
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        } catch (InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        return FutureUtil.failedFuture(new PulsarClientException("Seek 
operation not supported on topics consumer"));
+    }
+
+    /**
+     * helper method that returns current state of data structure used to 
track acks for batch messages
+     *
+     * @return true if all batch messages have been acknowledged
+     */
+    public boolean isBatchingAckTrackerEmpty() {
+        return consumers.values().stream().allMatch(consumer -> 
consumer.isBatchingAckTrackerEmpty());
+    }
+
+
+    @Override
+    public int getAvailablePermits() {
+        return 
consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
+    }
+
+    @Override
+    public boolean hasReachedEndOfTopic() {
+        return 
consumers.values().stream().allMatch(Consumer::hasReachedEndOfTopic);
+    }
+
+    @Override
+    public int numMessagesInQueue() {
+        return incomingMessages.size() + 
consumers.values().stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
+    }
+
+    @Override
+    public synchronized ConsumerStats getStats() {
+        if (stats == null) {
+            return null;
+        }
+        stats.reset();
+
+        consumers.values().stream().forEach(consumer -> 
stats.updateCumulativeStats(consumer.getStats()));
+        return stats;
+    }
+
+    public UnAckedMessageTracker getUnAckedMessageTracker() {
+        return unAckedMessageTracker;
+    }
+
+    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
+        Message peek = incomingMessages.peek();
+        if (peek != null) {
+            if (!messageIds.contains(peek.getMessageId())) {
+                // first message is not expired, then no message is expired in 
queue.
+                return;
+            }
+
+            // try not to remove elements that are added while we remove
+            Message message = incomingMessages.poll();
+            checkState(message instanceof TopicMessageImpl);
+            while (message != null) {
+                MessageId messageId = message.getMessageId();
+                if (!messageIds.contains(messageId)) {
+                    messageIds.add(messageId);
+                    break;
+                }
+                message = incomingMessages.poll();
+            }
+        }
+    }
+
+    private boolean topicNameValid(String topicName) {
+        checkArgument(DestinationName.isValid(topicName), "Invalid topic 
name:" + topicName);
+        checkArgument(!topics.containsKey(topicName), "Topics already contains 
topic:" + topicName);
+
+        if (this.namespaceName != null) {
+            
checkArgument(DestinationName.get(topicName).getNamespace().toString().equals(this.namespaceName.toString()),
+                "Topic " + topicName + " not in same namespace with Topics");
+        }
+
+        return true;
+    }
+
+    // subscribe one more given topic
+    public CompletableFuture<Void> subscribeAsync(String topicName) {
+        if (!topicNameValid(topicName)) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topic name 
not valid"));
+        }
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+        final AtomicInteger partitionNumber = new AtomicInteger(0);
+
+        client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Received topic {} metadata.partitions: {}", 
topicName, metadata.partitions);
+            }
+
+            List<CompletableFuture<Consumer>> futureList;
+
+            if (metadata.partitions > 1) {
+                this.topics.putIfAbsent(topicName, metadata.partitions);
+                numberTopicPartitions.addAndGet(metadata.partitions);
+                partitionNumber.addAndGet(metadata.partitions);
+
+                futureList = IntStream
+                    .range(0, partitionNumber.get())
+                    .mapToObj(
+                        partitionIndex -> {
+                            String partitionName = 
DestinationName.get(topicName).getPartition(partitionIndex).toString();
+                            CompletableFuture<Consumer> subFuture = new 
CompletableFuture<Consumer>();
+                            ConsumerImpl newConsumer = new 
ConsumerImpl(client, partitionName, subscription, internalConfig,
+                                
client.externalExecutorProvider().getExecutor(), partitionIndex,
+                                subFuture);
+                            consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
+                            return subFuture;
+                        })
+                    .collect(Collectors.toList());
+            } else {
+                this.topics.putIfAbsent(topicName, 1);
+                numberTopicPartitions.incrementAndGet();
+                partitionNumber.incrementAndGet();
+
+                CompletableFuture<Consumer> subFuture = new 
CompletableFuture<Consumer>();
+                ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, 
subscription, internalConfig,
+                    client.externalExecutorProvider().getExecutor(), 0,
+                    subFuture);
+                consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+                futureList = Lists.newArrayList(subFuture);
+            }
+
+            FutureUtil.waitForAll(futureList)
+                .thenAccept(finalFuture -> {
+                    try {
+                        if (numberTopicPartitions.get() > 
maxReceiverQueueSize) {
+                            
setMaxReceiverQueueSize(numberTopicPartitions.get());
+                        }
+                        int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+                        checkState(numberTopicPartitions.get() == numTopics,
+                            "numberTopicPartitions " + 
numberTopicPartitions.get()
+                                + " not equals expected: " + numTopics);
+
+                        // We have successfully created new consumers, so we 
can start receiving messages for them
+                        startReceivingMessages(
+                            consumers.values().stream()
+                                .filter(consumer1 -> {
+                                    String consumerTopicName = 
consumer1.getTopic();
+                                    if (DestinationName.get(consumerTopicName)
+                                        
.getPartitionedTopicName().equals(topicName)) {
+                                        return true;
+                                    } else {
+                                        return false;
+                                    }
+                                })
+                                .collect(Collectors.toList()));
+
+                        subscribeResult.complete(null);
+                        log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, numberTopicPartitions {}",
+                            topic, subscription, topicName, 
numberTopicPartitions.get());
+                        if (this.namespaceName == null) {
+                            this.namespaceName = 
DestinationName.get(topicName).getNamespaceObject();
+                        }
+                        return;
+                    } catch (PulsarClientException e) {
+                        handleSubscribeOneTopicError(topicName, e);
+                        subscribeResult.completeExceptionally(e);
+                    }
+                })
+                .exceptionally(ex -> {
+                    handleSubscribeOneTopicError(topicName, ex);
+                    subscribeResult.completeExceptionally(ex);
+                    return null;
+                });
+        }).exceptionally(ex1 -> {
+            log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topicName, ex1.getMessage());
+            subscribeResult.completeExceptionally(ex1);
+            return null;
+        });
+
+        return subscribeResult;
+    }
+
+    // handling failure during subscribe new topic, unsubscribe success 
created partitions
+    private void handleSubscribeOneTopicError(String topicName, Throwable 
error) {
+        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
", topic, topicName, error.getMessage());
+
+        consumers.values().stream().filter(consumer1 -> {
+            String consumerTopicName = consumer1.getTopic();
+            if 
(DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicName))
 {
+                return true;
+            } else {
+                return false;
+            }
+        }).forEach(consumer2 ->  {
+            consumer2.closeAsync().handle((ok, closeException) -> {
+                consumer2.subscribeFuture().completeExceptionally(error);
+                return null;
+            });
+            consumers.remove(consumer2.getTopic());
+        });
+
+        topics.remove(topicName);
+        checkState(numberTopicPartitions.get() == consumers.values().size());
+    }
+
+    // un-subscribe a given topic
+    public CompletableFuture<Void> unsubscribeAsync(String topicName) {
+        checkArgument(DestinationName.isValid(topicName), "Invalid topic 
name:" + topicName);
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        String topicPartName = 
DestinationName.get(topicName).getPartitionedTopicName();
+
+        List<ConsumerImpl> consumersToUnsub = consumers.values().stream()
+            .filter(consumer -> {
+                String consumerTopicName = consumer.getTopic();
+                if 
(DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName))
 {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).collect(Collectors.toList());
+
+        List<CompletableFuture<Void>> futureList = consumersToUnsub.stream()
+            .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    consumersToUnsub.forEach(consumer1 -> {
+                        consumers.remove(consumer1.getTopic());
+                        pausedConsumers.remove(consumer1);
+                        numberTopicPartitions.decrementAndGet();
+                    });
+
+                    topics.remove(topicName);
+                    ((UnAckedTopicMessageTracker) 
unAckedMessageTracker).removeTopicMessages(topicName);
+
+                    unsubscribeFuture.complete(null);
+                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, 
numberTopicPartitions: {}",
+                        topicName, subscription, consumerName, 
numberTopicPartitions);
+                } else {
+                    unsubscribeFuture.completeExceptionally(ex);
+                    setState(State.Failed);
+                    log.error("[{}] [{}] [{}] Could not unsubscribe Topics 
Consumer",
+                        topicName, subscription, consumerName, ex.getCause());
+                }
+            });
+
+        return unsubscribeFuture;
+    }
+
+    // get topics name
+    public List<String> getTopics() {
+        return topics.keySet().stream().collect(Collectors.toList());
+    }
+
+    // get partitioned topics name
+    public List<String> getPartitionedTopics() {
+        return consumers.keySet().stream().collect(Collectors.toList());
+    }
+
+    // get partitioned consumers
+    public List<ConsumerImpl> getConsumers() {
+        return consumers.values().stream().collect(Collectors.toList());
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(TopicsConsumerImpl.class);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 796abf6..0066e72 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,28 +18,25 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(UnAckedMessageTracker.class);
-    private ConcurrentOpenHashSet<MessageIdImpl> currentSet;
-    private ConcurrentOpenHashSet<MessageIdImpl> oldOpenSet;
+    protected ConcurrentOpenHashSet<MessageId> currentSet;
+    protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
     private final ReentrantReadWriteLock readWriteLock;
-    private final Lock readLock;
+    protected final Lock readLock;
     private final Lock writeLock;
     private Timeout timeout;
 
@@ -51,17 +48,17 @@ public class UnAckedMessageTracker implements Closeable {
         }
 
         @Override
-        public boolean add(MessageIdImpl m) {
+        public boolean add(MessageId m) {
             return true;
         }
 
         @Override
-        public boolean remove(MessageIdImpl m) {
+        public boolean remove(MessageId m) {
             return true;
         }
 
         @Override
-        public int removeMessagesTill(MessageIdImpl msgId) {
+        public int removeMessagesTill(MessageId msgId) {
             return 0;
         }
 
@@ -77,8 +74,8 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase 
consumerBase, long ackTimeoutMillis) {
-        currentSet = new ConcurrentOpenHashSet<MessageIdImpl>();
-        oldOpenSet = new ConcurrentOpenHashSet<MessageIdImpl>();
+        currentSet = new ConcurrentOpenHashSet<MessageId>();
+        oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
         readWriteLock = new ReentrantReadWriteLock();
         readLock = readWriteLock.readLock();
         writeLock = readWriteLock.writeLock();
@@ -92,7 +89,7 @@ public class UnAckedMessageTracker implements Closeable {
             public void run(Timeout t) throws Exception {
                 if (isAckTimeout()) {
                     log.warn("[{}] {} messages have timed-out", consumerBase, 
oldOpenSet.size());
-                    Set<MessageIdImpl> messageIds = new HashSet<>();
+                    Set<MessageId> messageIds = new HashSet<>();
                     oldOpenSet.forEach(messageIds::add);
                     oldOpenSet.clear();
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
@@ -106,7 +103,7 @@ public class UnAckedMessageTracker implements Closeable {
     void toggle() {
         writeLock.lock();
         try {
-            ConcurrentOpenHashSet<MessageIdImpl> temp = currentSet;
+            ConcurrentOpenHashSet<MessageId> temp = currentSet;
             currentSet = oldOpenSet;
             oldOpenSet = temp;
         } finally {
@@ -124,7 +121,7 @@ public class UnAckedMessageTracker implements Closeable {
         }
     }
 
-    public boolean add(MessageIdImpl m) {
+    public boolean add(MessageId m) {
         readLock.lock();
         try {
             oldOpenSet.remove(m);
@@ -144,7 +141,7 @@ public class UnAckedMessageTracker implements Closeable {
         }
     }
 
-    public boolean remove(MessageIdImpl m) {
+    public boolean remove(MessageId m) {
         readLock.lock();
         try {
             return currentSet.remove(m) || oldOpenSet.remove(m);
@@ -171,15 +168,12 @@ public class UnAckedMessageTracker implements Closeable {
         }
     }
 
-    public int removeMessagesTill(MessageIdImpl msgId) {
+    public int removeMessagesTill(MessageId msgId) {
         readLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> 
((m.getLedgerId() < msgId.getLedgerId()
-                    || (m.getLedgerId() == msgId.getLedgerId() && 
m.getEntryId() <= msgId.getEntryId()))
-                    && m.getPartitionIndex() == msgId.getPartitionIndex()));
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> 
((m.getLedgerId() < msgId.getLedgerId()
-                    || (m.getLedgerId() == msgId.getLedgerId() && 
m.getEntryId() <= msgId.getEntryId()))
-                    && m.getPartitionIndex() == msgId.getPartitionIndex()));
+            int currentSetRemovedMsgCount = currentSet.removeIf(m -> 
(m.compareTo(msgId) <= 0));
+            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> 
(m.compareTo(msgId) <= 0));
+
             return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
         } finally {
             readLock.unlock();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
new file mode 100644
index 0000000..eceedf6
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
+
+    public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase 
consumerBase, long ackTimeoutMillis) {
+        super(client, consumerBase, ackTimeoutMillis);
+    }
+
+    public int removeTopicMessages(String topicName) {
+        readLock.lock();
+        try {
+            int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
+                checkState(m instanceof TopicMessageIdImpl,
+                    "message should be of type TopicMessageIdImpl");
+                return 
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+            });
+            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
+                checkState(m instanceof TopicMessageIdImpl,
+                    "message should be of type TopicMessageIdImpl");
+                return 
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+            });
+
+            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to