sijie closed pull request #1996: Cpp client: add multiTopicsConsumer
URL: https://github.com/apache/incubator-pulsar/pull/1996
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/Client.h 
b/pulsar-client-cpp/include/pulsar/Client.h
index 07b4355fa9..6a9e4878b7 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,15 @@ class Client {
     void subscribeAsync(const std::string& topic, const std::string& 
consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    Result subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                     Consumer& consumer);
+    Result subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                     const ConsumerConfiguration& conf, Consumer& consumer);
+    void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                        SubscribeCallback callback);
+    void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                        const ConsumerConfiguration& conf, SubscribeCallback 
callback);
+
     /**
      * Create a topic reader with given {@code ReaderConfiguration} for 
reading messages from the specified
      * topic.
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h 
b/pulsar-client-cpp/include/pulsar/Consumer.h
index 4272166864..4486515440 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -243,6 +243,7 @@ class Consumer {
     friend class PulsarFriend;
     friend class PulsarWrapper;
     friend class PartitionedConsumerImpl;
+    friend class MultiTopicsConsumerImpl;
     friend class ConsumerImpl;
     friend class ClientImpl;
     friend class ConsumerTest;
diff --git a/pulsar-client-cpp/include/pulsar/Message.h 
b/pulsar-client-cpp/include/pulsar/Message.h
index aff0d94933..a3b9af0fb3 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -134,6 +134,7 @@ class Message {
             proto::SingleMessageMetadata& singleMetadata);
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
+    friend class MultiTopicsConsumerImpl;
     friend class MessageBuilder;
     friend class ConsumerImpl;
     friend class ProducerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h 
b/pulsar-client-cpp/include/pulsar/MessageId.h
index dfe3a51fca..e9ff133684 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -22,6 +22,7 @@
 #include <iosfwd>
 #include <stdint.h>
 #include <boost/shared_ptr.hpp>
+//#include <lib/MessageIdImpl.h>
 
 #pragma GCC visibility push(default)
 
@@ -50,6 +51,16 @@ class MessageId {
      */
     void serialize(std::string& result) const;
 
+    /**
+     * Get the topic Name
+     */
+    const std::string& getTopicName() const;
+
+    /**
+     * Set the topicName
+     */
+    void setTopicName(const std::string& topicName);
+
     /**
      * Deserialize a message id from a binary string
      */
@@ -71,6 +82,7 @@ class MessageId {
     friend class Commands;
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
+    friend class MultiTopicsConsumerImpl;
     friend class UnAckedMessageTrackerEnabled;
     friend class BatchAcknowledgementTracker;
     friend class PulsarWrapper;
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index 5936e48080..bba35203e1 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -90,6 +90,30 @@ void Client::subscribeAsync(const std::string& topic, const 
std::string& consume
     impl_->subscribeAsync(topic, consumerName, conf, callback);
 }
 
+Result Client::subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                         Consumer& consumer) {
+    return subscribe(topics, subscriptionName, ConsumerConfiguration(), 
consumer);
+}
+
+Result Client::subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                         const ConsumerConfiguration& conf, Consumer& 
consumer) {
+    Promise<Result, Consumer> promise;
+    subscribeAsync(topics, subscriptionName, conf, 
WaitForCallbackValue<Consumer>(promise));
+    Future<Result, Consumer> future = promise.getFuture();
+
+    return future.get(consumer);
+}
+
+void Client::subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                            SubscribeCallback callback) {
+    subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), 
callback);
+}
+
+void Client::subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
+                            const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
+    impl_->subscribeAsync(topics, subscriptionName, conf, callback);
+}
+
 Result Client::createReader(const std::string& topic, const MessageId& 
startMessageId,
                             const ReaderConfiguration& conf, Reader& reader) {
     Promise<Result, Reader> promise;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 1d46cd94f7..376892633b 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -24,6 +24,7 @@
 #include "ReaderImpl.h"
 #include "PartitionedProducerImpl.h"
 #include "PartitionedConsumerImpl.h"
+#include "MultiTopicsConsumerImpl.h"
 #include "SimpleLoggerImpl.h"
 #include "Log4CxxLogger.h"
 #include <boost/bind.hpp>
@@ -33,6 +34,7 @@
 #include "boost/date_time/posix_time/posix_time.hpp"
 #include <lib/HTTPLookupService.h>
 #include <lib/TopicName.h>
+#include <algorithm>
 
 DECLARE_LOG_OBJECT()
 
@@ -210,6 +212,40 @@ void ClientImpl::handleReaderMetadataLookup(const Result 
result, const LookupDat
     consumers_.push_back(reader->getConsumer());
 }
 
+void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const 
std::string& consumerName,
+                                const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
+    TopicNamePtr topicNamePtr;
+
+    Lock lock(mutex_);
+    if (state_ != Open) {
+        lock.unlock();
+        callback(ResultAlreadyClosed, Consumer());
+        return;
+    } else {
+        if (!topics.empty() && !(topicNamePtr = 
MultiTopicsConsumerImpl::topicNamesValid(topics))) {
+            lock.unlock();
+            callback(ResultInvalidTopicName, Consumer());
+            return;
+        }
+    }
+
+    if (topicNamePtr) {
+        std::string randomName = generateRandomName();
+        std::stringstream consumerTopicNameStream;
+        consumerTopicNameStream << topicNamePtr->toString() << 
"-TopicsConsumerFakeName-" << randomName;
+        topicNamePtr = TopicName::get(consumerTopicNameStream.str());
+    }
+
+    ConsumerImplBasePtr consumer = boost::make_shared<MultiTopicsConsumerImpl>(
+        shared_from_this(), topics, consumerName, topicNamePtr, conf, 
lookupServicePtr_);
+
+    consumer->getConsumerCreatedFuture().addListener(
+        boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), 
_1, _2, callback, consumer));
+    consumers_.push_back(consumer);
+    lock.unlock();
+    consumer->start();
+}
+
 void ClientImpl::subscribeAsync(const std::string& topic, const std::string& 
consumerName,
                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
     TopicNamePtr topicName;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h 
b/pulsar-client-cpp/lib/ClientImpl.h
index 5283b58729..550298b67f 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -57,6 +57,9 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
     void subscribeAsync(const std::string& topic, const std::string& 
consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& consumerName,
+                        const ConsumerConfiguration& conf, SubscribeCallback 
callback);
+
     void createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                            const ReaderConfiguration& conf, ReaderCallback 
callback);
 
diff --git a/pulsar-client-cpp/lib/MessageId.cc 
b/pulsar-client-cpp/lib/MessageId.cc
index c5314d89a6..53946f8d16 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -130,5 +130,9 @@ bool MessageId::operator==(const MessageId& other) const {
 
 bool MessageId::operator!=(const MessageId& other) const { return !(*this == 
other); }
 
+const std::string& MessageId::getTopicName() const { return 
impl_->getTopicName(); }
+
+void MessageId::setTopicName(const std::string& topicName) { return 
impl_->setTopicName(topicName); }
+
 #pragma GCC visibility pop
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageIdImpl.h 
b/pulsar-client-cpp/lib/MessageIdImpl.h
index a3fc1714a5..ae33da4cfe 100644
--- a/pulsar-client-cpp/lib/MessageIdImpl.h
+++ b/pulsar-client-cpp/lib/MessageIdImpl.h
@@ -25,12 +25,25 @@ namespace pulsar {
 
 class MessageIdImpl {
    public:
-    MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), 
batchIndex_(-1) {}
+    MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), 
batchIndex_(-1), topicName_() {}
     MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, 
int32_t batchIndex)
-        : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), 
batchIndex_(batchIndex) {}
+        : ledgerId_(ledgerId),
+          entryId_(entryId),
+          partition_(partition),
+          batchIndex_(batchIndex),
+          topicName_() {}
     const int64_t ledgerId_;
     const int64_t entryId_;
     const int32_t partition_;
     const int32_t batchIndex_;
+
+    const std::string& getTopicName() { return *topicName_; }
+    void setTopicName(const std::string& topicName) { topicName_ = &topicName; 
}
+
+   private:
+    const std::string* topicName_;
+    friend class MessageImpl;
+    friend class MultiTopicsConsumerImpl;
+    friend class UnAckedMessageTrackerEnabled;
 };
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc 
b/pulsar-client-cpp/lib/MessageImpl.cc
index 569a30ac17..9b59eff4d1 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -20,7 +20,7 @@
 
 namespace pulsar {
 
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0) {}
+MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), 
topicName_() {}
 
 const Message::StringMap& MessageImpl::properties() {
     if (properties_.size() == 0) {
@@ -78,4 +78,12 @@ void MessageImpl::setPartitionKey(const std::string& 
partitionKey) {
 }
 
 void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) { 
metadata.set_event_time(eventTimestamp); }
+
+void MessageImpl::setTopicName(const std::string& topicName) {
+    topicName_ = &topicName;
+    messageId.setTopicName(topicName);
+}
+
+const std::string& MessageImpl::getTopicName() { return *topicName_; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h 
b/pulsar-client-cpp/lib/MessageImpl.h
index f3753b1cf7..0ef63e8dc9 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -43,6 +43,7 @@ class MessageImpl {
     SharedBuffer payload;
     MessageId messageId;
     ClientConnection* cnx_;
+    const std::string* topicName_;
 
     const std::string& getPartitionKey() const;
     bool hasPartitionKey() const;
@@ -50,6 +51,16 @@ class MessageImpl {
     uint64_t getPublishTimestamp() const;
     uint64_t getEventTimestamp() const;
 
+    /**
+     * Get a valid topicName
+     */
+    const std::string& getTopicName();
+
+    /**
+     * Set a valid topicName
+     */
+    void setTopicName(const std::string& topicName);
+
     friend class PulsarWrapper;
     friend class MessageBuilder;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
new file mode 100644
index 0000000000..5220307bdb
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
+#include <boost/date_time/local_time/local_time.hpp>
+#include <algorithm>
+#include <numeric>
+
+using namespace pulsar;
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::DELIMITER = ";";
+
+MultiTopicsBrokerConsumerStatsImpl::MultiTopicsBrokerConsumerStatsImpl(size_t 
size) {
+    statsList_.resize(size);
+}
+
+bool MultiTopicsBrokerConsumerStatsImpl::isValid() const {
+    bool isValid = true;
+    for (int i = 0; i < statsList_.size(); i++) {
+        isValid = isValid && statsList_[i].isValid();
+    }
+    return isValid;
+}
+
+std::ostream& operator<<(std::ostream& os, const 
MultiTopicsBrokerConsumerStatsImpl& obj) {
+    os << "\nMultiTopicsBrokerConsumerStatsImpl ["
+       << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << 
obj.getMsgRateOut()
+       << ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
+       << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
+       << ", consumerName_ = " << obj.getConsumerName()
+       << ", availablePermits_ = " << obj.getAvailablePermits()
+       << ", unackedMessages_ = " << obj.getUnackedMessages()
+       << ", blockedConsumerOnUnackedMsgs_ = " << 
obj.isBlockedConsumerOnUnackedMsgs()
+       << ", address_ = " << obj.getAddress() << ", connectedSince_ = " << 
obj.getConnectedSince()
+       << ", type_ = " << obj.getType() << ", msgRateExpired_ = " << 
obj.getMsgRateExpired()
+       << ", msgBacklog_ = " << obj.getMsgBacklog() << "]";
+    return os;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateOut() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgRateOut();
+    }
+    return sum;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgThroughputOut() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgThroughputOut();
+    }
+    return sum;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateRedeliver() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgRateRedeliver();
+    }
+    return sum;
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getConsumerName() const {
+    std::string str;
+    for (int i = 0; i < statsList_.size(); i++) {
+        str += statsList_[i].getConsumerName() + DELIMITER;
+    }
+    return str;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getAvailablePermits() const {
+    uint64_t sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getAvailablePermits();
+    }
+    return sum;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getUnackedMessages() const {
+    uint64_t sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getUnackedMessages();
+    }
+    return sum;
+}
+
+bool MultiTopicsBrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() 
const {
+    if (statsList_.size() == 0) {
+        return false;
+    }
+
+    return isValid();
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getAddress() const {
+    std::stringstream str;
+    for (int i = 0; i < statsList_.size(); i++) {
+        str << statsList_[i].getAddress() << DELIMITER;
+    }
+    return str.str();
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getConnectedSince() 
const {
+    std::stringstream str;
+    for (int i = 0; i < statsList_.size(); i++) {
+        str << statsList_[i].getConnectedSince() << DELIMITER;
+    }
+    return str.str();
+}
+
+const ConsumerType MultiTopicsBrokerConsumerStatsImpl::getType() const {
+    if (!statsList_.size()) {
+        return ConsumerExclusive;
+    }
+    return statsList_[0].getType();
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateExpired() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgRateExpired();
+    }
+    return sum;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getMsgBacklog() const {
+    uint64_t sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgBacklog();
+    }
+    return sum;
+}
+
+BrokerConsumerStats 
MultiTopicsBrokerConsumerStatsImpl::getBrokerConsumerStats(int index) {
+    return statsList_[index];
+}
+
+void MultiTopicsBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int 
index) {
+    statsList_[index] = stats;
+}
+
+void MultiTopicsBrokerConsumerStatsImpl::clear() { statsList_.clear(); }
diff --git a/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
new file mode 100644
index 0000000000..568cda1715
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
+#define PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
+
+#include <string.h>
+#include <iostream>
+#include <vector>
+#include <pulsar/Result.h>
+#include <boost/function.hpp>
+#include <boost/date_time/microsec_time_clock.hpp>
+#include <lib/BrokerConsumerStatsImpl.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
+#pragma GCC visibility push(default)
+namespace pulsar {
+class MultiTopicsBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
+   private:
+    std::vector<BrokerConsumerStats> statsList_;
+    static const std::string DELIMITER;
+
+   public:
+    MultiTopicsBrokerConsumerStatsImpl(size_t size);
+
+    /** Returns true if the Stats are still valid **/
+    virtual bool isValid() const;
+
+    /** Returns the rate of messages delivered to the consumer. msg/s */
+    virtual double getMsgRateOut() const;
+
+    /** Returns the throughput delivered to the consumer. bytes/s */
+    virtual double getMsgThroughputOut() const;
+
+    /** Returns the rate of messages redelivered by this consumer. msg/s */
+    virtual double getMsgRateRedeliver() const;
+
+    /** Returns the Name of the consumer */
+    virtual const std::string getConsumerName() const;
+
+    /** Returns the Number of available message permits for the consumer */
+    virtual uint64_t getAvailablePermits() const;
+
+    /** Returns the Number of unacknowledged messages for the consumer */
+    virtual uint64_t getUnackedMessages() const;
+
+    /** Returns true if the consumer is blocked due to unacked messages.  */
+    virtual bool isBlockedConsumerOnUnackedMsgs() const;
+
+    /** Returns the Address of this consumer */
+    virtual const std::string getAddress() const;
+
+    /** Returns the Timestamp of connection */
+    virtual const std::string getConnectedSince() const;
+
+    /** Returns Whether this subscription is Exclusive or Shared or Failover */
+    virtual const ConsumerType getType() const;
+
+    /** Returns the rate of messages expired on this subscription. msg/s */
+    virtual double getMsgRateExpired() const;
+
+    /** Returns the Number of messages in the subscription backlog */
+    virtual uint64_t getMsgBacklog() const;
+
+    /** Returns the BrokerConsumerStatsImpl at of ith partition */
+    BrokerConsumerStats getBrokerConsumerStats(int index);
+
+    void add(BrokerConsumerStats stats, int index);
+
+    void clear();
+
+    friend std::ostream &operator<<(std::ostream &os, const 
MultiTopicsBrokerConsumerStatsImpl &obj);
+};
+typedef boost::shared_ptr<MultiTopicsBrokerConsumerStatsImpl> 
MultiTopicsBrokerConsumerStatsPtr;
+}  // namespace pulsar
+#pragma GCC visibility pop
+#endif  // PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
new file mode 100644
index 0000000000..7be197c89c
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MultiTopicsConsumerImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const 
std::vector<std::string>& topics,
+                                                 const std::string& 
subscriptionName, TopicNamePtr topicName,
+                                                 const ConsumerConfiguration& 
conf,
+                                                 const LookupServicePtr 
lookupServicePtr)
+    : client_(client),
+      subscriptionName_(subscriptionName),
+      topic_(topicName ? topicName->toString() : "EmptyTopics"),
+      conf_(conf),
+      state_(Pending),
+      messages_(1000),
+      listenerExecutor_(client->getListenerExecutorProvider()->get()),
+      messageListener_(conf.getMessageListener()),
+      namespaceName_(topicName ? topicName->getNamespaceName() : 
boost::shared_ptr<NamespaceName>()),
+      lookupServicePtr_(lookupServicePtr),
+      numberTopicPartitions_(boost::make_shared<std::atomic<int>>(0)),
+      topics_(topics) {
+    std::stringstream consumerStrStream;
+    consumerStrStream << "[Muti Topics Consumer: "
+                      << "TopicName - " << topic_ << " - Subscription - " << 
subscriptionName << "]";
+    consumerStr_ = consumerStrStream.str();
+
+    if (conf.getUnAckedMessagesTimeoutMs() != 0) {
+        unAckedMessageTrackerPtr_.reset(
+            new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+    } else {
+        unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
+    }
+}
+
+void MultiTopicsConsumerImpl::start() {
+    if (topics_.empty()) {
+        if (compareAndSetState(Pending, Ready)) {
+            LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
+            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+            return;
+        } else {
+            LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << 
state_);
+            multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+            return;
+        }
+    }
+
+    // start call subscribeOneTopicAsync for each single topic
+    int topicsNumber = topics_.size();
+    boost::shared_ptr<std::atomic<int>> topicsNeedCreate = 
boost::make_shared<std::atomic<int>>(topicsNumber);
+    // subscribe for each passed in topic
+    for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr 
!= topics_.end(); itr++) {
+        subscribeOneTopicAsync(*itr).addListener(
+            boost::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed, 
shared_from_this(), _1, _2, *itr,
+                        topicsNeedCreate));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer 
consumer,
+                                                       const std::string& 
topic,
+                                                       
boost::shared_ptr<std::atomic<int>> topicsNeedCreate) {
+    int previous = topicsNeedCreate->fetch_sub(1);
+    assert(previous > 0);
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Failed when subscribed to topic " << topic << " in 
TopicsConsumer. Error - " << result);
+    }
+
+    LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
+
+    if (topicsNeedCreate->load() == 0) {
+        if (compareAndSetState(Pending, Ready)) {
+            LOG_INFO("Successfully Subscribed to Topics");
+            if (!namespaceName_) {
+                namespaceName_ = TopicName::get(topic)->getNamespaceName();
+            }
+            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+        } else {
+            LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " 
Error - " << result);
+            // unsubscribed all of the successfully subscribed partitioned 
consumers
+            ResultCallback nullCallbackForCleanup = NULL;
+            closeAsync(nullCallbackForCleanup);
+            multiTopicsConsumerCreatedPromise_.setFailed(result);
+            return;
+        }
+        return;
+    }
+}
+
+// subscribe for passed in topic
+Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const 
std::string& topic) {
+    TopicNamePtr topicName;
+    ConsumerSubResultPromisePtr topicPromise = 
boost::make_shared<Promise<Result, Consumer>>();
+    if (!(topicName = TopicName::get(topic))) {
+        LOG_ERROR("TopicName invalid: " << topic);
+        topicPromise->setFailed(ResultInvalidTopicName);
+        return topicPromise->getFuture();
+    }
+
+    if (namespaceName_ && !(*namespaceName_ == 
*(topicName->getNamespaceName()))) {
+        LOG_ERROR("TopicName namespace not the same with topicsConsumer. 
wanted namespace: "
+                  << namespaceName_->toString() << " this topic: " << topic);
+        topicPromise->setFailed(ResultInvalidTopicName);
+        return topicPromise->getFuture();
+    }
+
+    if (state_ == Closed || state_ == Closing) {
+        LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
+        topicPromise->setFailed(ResultAlreadyClosed);
+        return topicPromise->getFuture();
+    }
+
+    // subscribe for each partition, when all partitions completed, complete 
promise
+    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+        boost::bind(&MultiTopicsConsumerImpl::subscribeTopicPartitions, 
shared_from_this(), _1, _2, topicName,
+                    subscriptionName_, conf_, topicPromise));
+    return topicPromise->getFuture();
+}
+
+void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
+                                                       const 
LookupDataResultPtr partitionMetadata,
+                                                       TopicNamePtr topicName,
+                                                       const std::string& 
consumerName,
+                                                       ConsumerConfiguration 
conf,
+                                                       
ConsumerSubResultPromisePtr topicSubResultPromise) {
+    if (result != ResultOk) {
+        LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics 
Subscribing- "
+                  << consumerStr_ << " result: " << result)
+        topicSubResultPromise->setFailed(result);
+        return;
+    }
+
+    boost::shared_ptr<ConsumerImpl> consumer;
+    ConsumerConfiguration config;
+    ExecutorServicePtr internalListenerExecutor = 
client_->getPartitionListenerExecutorProvider()->get();
+
+    // all the consumers should have same name.
+    config.setConsumerName(conf_.getConsumerName());
+    config.setConsumerType(conf_.getConsumerType());
+    
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
+    config.setMessageListener(
+        boost::bind(&MultiTopicsConsumerImpl::messageReceived, 
shared_from_this(), _1, _2));
+    config.setReceiverQueueSize(conf_.getReceiverQueueSize());
+
+    int numPartitions = partitionMetadata->getPartitions() >= 1 ? 
partitionMetadata->getPartitions() : 1;
+
+    Lock lock(mutex_);
+    topicsPartitions_.insert(std::make_pair(topicName->toString(), 
numPartitions));
+    lock.unlock();
+    numberTopicPartitions_->fetch_add(numPartitions);
+
+    boost::shared_ptr<std::atomic<int>> partitionsNeedCreate =
+        boost::make_shared<std::atomic<int>>(numPartitions);
+
+    for (int i = 0; i < numPartitions; i++) {
+        std::string topicPartitionName = topicName->getTopicPartitionName(i);
+        consumer = boost::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_, config,
+                                                    internalListenerExecutor, 
Partitioned);
+        consumer->getConsumerCreatedFuture().addListener(
+            boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(), _1, _2,
+                        partitionsNeedCreate, topicSubResultPromise));
+        consumer->setPartitionIndex(i);
+        consumers_.insert(std::make_pair(topicPartitionName, consumer));
+        LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " << 
consumerStr_);
+        consumer->start();
+    }
+}
+
+void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
+    Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+    boost::shared_ptr<std::atomic<int>> partitionsNeedCreate,
+    ConsumerSubResultPromisePtr topicSubResultPromise) {
+    if (state_ == Failed) {
+        // one of the consumer creation failed, and we are cleaning up
+        topicSubResultPromise->setFailed(ResultAlreadyClosed);
+        LOG_ERROR("Unable to create Consumer " << consumerStr_ << " state == 
Failed, result: " << result);
+        return;
+    }
+
+    int previous = partitionsNeedCreate->fetch_sub(1);
+    assert(previous > 0);
+
+    if (result != ResultOk) {
+        topicSubResultPromise->setFailed(result);
+        LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - 
" << result);
+        return;
+    }
+
+    LOG_DEBUG("Successfully Subscribed to a single partition of topic in 
TopicsConsumer. "
+              << "Partitions need to create - " << previous - 1);
+
+    if (partitionsNeedCreate->load() == 0) {
+        topicSubResultPromise->setValue(Consumer(shared_from_this()));
+    }
+}
+
+void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+    LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] 
Unsubscribing");
+
+    Lock lock(mutex_);
+    if (state_ == Closing || state_ == Closed) {
+        LOG_INFO(consumerStr_ << " already closed");
+        lock.unlock();
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    state_ = Closing;
+    lock.unlock();
+
+    boost::shared_ptr<std::atomic<int>> consumerUnsubed = 
boost::make_shared<std::atomic<int>>(0);
+
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        LOG_DEBUG("Unsubcribing Consumer - " << consumer->first);
+        (consumer->second)
+            
->unsubscribeAsync(boost::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync,
+                                           shared_from_this(), _1, 
consumerUnsubed, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
+                                                      
boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+                                                      ResultCallback callback) 
{
+    int previous = consumerUnsubed->fetch_add(1);
+    assert(previous < numberTopicPartitions_->load());
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, 
result: "
+                  << result << " subscription - " << subscriptionName_);
+    }
+
+    if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
+        LOG_DEBUG("Unsubscribed all of the partition consumer for 
TopicsConsumer.  - " << consumerStr_);
+        consumers_.clear();
+        topicsPartitions_.clear();
+        unAckedMessageTrackerPtr_->clear();
+
+        Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
+        setState(Closed);
+        callback(result1);
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& 
topic, ResultCallback callback) {
+    std::map<std::string, int>::iterator it = topicsPartitions_.find(topic);
+    if (it == topicsPartitions_.end()) {
+        LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " 
subscription - "
+                                                               << 
subscriptionName_);
+        callback(ResultTopicNotFound);
+        return;
+    }
+
+    if (state_ == Closing || state_ == Closed) {
+        LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << 
topic << " subscription - "
+                                                                           << 
subscriptionName_);
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    TopicNamePtr topicName;
+    if (!(topicName = TopicName::get(topic))) {
+        LOG_ERROR("TopicName invalid: " << topic);
+        callback(ResultUnknownError);
+    }
+    int numberPartitions = it->second;
+    boost::shared_ptr<std::atomic<int>> consumerUnsubed = 
boost::make_shared<std::atomic<int>>(0);
+
+    for (int i = 0; i < numberPartitions; i++) {
+        std::string topicPartitionName = topicName->getTopicPartitionName(i);
+        std::map<std::string, ConsumerImplPtr>::iterator iterator = 
consumers_.find(topicPartitionName);
+
+        if (consumers_.end() == iterator) {
+            LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " 
<< topicPartitionName);
+            callback(ResultUnknownError);
+        }
+
+        (iterator->second)
+            
->unsubscribeAsync(boost::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync,
+                                           shared_from_this(), _1, 
consumerUnsubed, numberPartitions,
+                                           topicName, topicPartitionName, 
callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
+    Result result, boost::shared_ptr<std::atomic<int>> consumerUnsubed, int 
numberPartitions,
+    TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback 
callback) {
+    int previous = consumerUnsubed->fetch_add(1);
+    assert(previous < numberPartitions);
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, 
result: "
+                  << result << " topicPartitionName - " << topicPartitionName);
+    }
+
+    LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " 
<< topicPartitionName);
+
+    std::map<std::string, ConsumerImplPtr>::iterator iterator = 
consumers_.find(topicPartitionName);
+    if (consumers_.end() != iterator) {
+        iterator->second->pauseMessageListener();
+        consumers_.erase(iterator);
+    }
+
+    if (consumerUnsubed->load() == numberPartitions) {
+        LOG_DEBUG("Unsubscribed all of the partition consumer for 
TopicsConsumer.  - " << consumerStr_);
+        std::map<std::string, int>::iterator it = 
topicsPartitions_.find(topicNamePtr->toString());
+        if (it != topicsPartitions_.end()) {
+            numberTopicPartitions_->fetch_sub(numberPartitions);
+            Lock lock(mutex_);
+            topicsPartitions_.erase(it);
+            lock.unlock();
+        }
+        if (state_ != Failed) {
+            callback(ResultOk);
+        } else {
+            callback(ResultUnknownError);
+        }
+        
unAckedMessageTrackerPtr_->removeTopicMessage(topicNamePtr->toString());
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+    if (state_ == Closing || state_ == Closed) {
+        LOG_ERROR("TopicsConsumer already closed "
+                  << " topic" << topic_ << " consumer - " << consumerStr_);
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    setState(Closing);
+
+    if (consumers_.empty()) {
+        LOG_ERROR("TopicsConsumer have no consumers to close "
+                  << " topic" << topic_ << " subscription - " << 
subscriptionName_);
+        setState(Closed);
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    // close successfully subscribed consumers
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        std::string topicPartitionName = consumer->first;
+        ConsumerImplPtr consumerPtr = consumer->second;
+
+        
consumerPtr->closeAsync(boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose,
+                                            shared_from_this(), _1, 
topicPartitionName, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, 
std::string& topicPartitionName,
+                                                        CloseCallback 
callback) {
+    std::map<std::string, ConsumerImplPtr>::iterator iterator = 
consumers_.find(topicPartitionName);
+    if (consumers_.end() != iterator) {
+        consumers_.erase(iterator);
+    }
+
+    LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << 
" numberTopicPartitions_ - "
+                                                      << 
numberTopicPartitions_->load());
+
+    assert(numberTopicPartitions_->load() > 0);
+    numberTopicPartitions_->fetch_sub(1);
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Closing the consumer failed for partition - " << 
topicPartitionName << " with error - "
+                                                                 << result);
+    }
+
+    // closed all consumers
+    if (numberTopicPartitions_->load() == 0) {
+        consumers_.clear();
+        topicsPartitions_.clear();
+        unAckedMessageTrackerPtr_->clear();
+
+        if (state_ != Failed) {
+            state_ = Closed;
+        }
+
+        multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+        if (!callback.empty()) {
+            callback(result);
+        }
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
+    LOG_DEBUG("Received Message from one of the topic - " << 
consumer.getTopic()
+                                                          << " message:" << 
msg.getDataAsString());
+    const std::string& topicPartitionName = consumer.getTopic();
+    msg.impl_->setTopicName(topicPartitionName);
+    messages_.push(msg);
+
+    if (messageListener_) {
+        listenerExecutor_->postWork(
+            boost::bind(&MultiTopicsConsumerImpl::internalListener, 
shared_from_this(), consumer));
+    }
+}
+
+void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
+    Message m;
+    messages_.pop(m);
+
+    try {
+        messageListener_(Consumer(shared_from_this()), m);
+    } catch (const std::exception& e) {
+        LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << 
e.what());
+    }
+}
+
+Result MultiTopicsConsumerImpl::receive(Message& msg) {
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        return ResultAlreadyClosed;
+    }
+
+    if (messageListener_) {
+        lock.unlock();
+        LOG_ERROR("Can not receive when a listener has been set");
+        return ResultInvalidConfiguration;
+    }
+    messages_.pop(msg);
+    lock.unlock();
+
+    unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    return ResultOk;
+}
+
+Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        return ResultAlreadyClosed;
+    }
+
+    if (messageListener_) {
+        lock.unlock();
+        LOG_ERROR("Can not receive when a listener has been set");
+        return ResultInvalidConfiguration;
+    }
+
+    if (messages_.pop(msg, milliseconds(timeout))) {
+        lock.unlock();
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        return ResultOk;
+    } else {
+        return ResultTimeout;
+    }
+}
+
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, 
ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    const std::string& topicPartitionName = msgId.getTopicName();
+    std::map<std::string, ConsumerImplPtr>::iterator iterator = 
consumers_.find(topicPartitionName);
+
+    if (consumers_.end() != iterator) {
+        unAckedMessageTrackerPtr_->remove(msgId);
+        iterator->second->acknowledgeAsync(msgId, callback);
+    } else {
+        LOG_ERROR("Message of topic: " << topicPartitionName << " not in 
unAckedMessageTracker");
+        callback(ResultUnknownError);
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& 
msgId, ResultCallback callback) {
+    callback(ResultOperationNotSupported);
+}
+
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
+
+Future<Result, ConsumerImplBaseWeakPtr> 
MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
+    return multiTopicsConsumerCreatedPromise_.getFuture();
+}
+const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { 
return subscriptionName_; }
+
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
+
+const std::string& MultiTopicsConsumerImpl::getName() const { return 
consumerStr_; }
+
+void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
+    Lock lock(mutex_);
+    state_ = state;
+}
+
+bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState 
expect,
+                                                 MultiTopicsConsumerState 
update) {
+    Lock lock(mutex_);
+    if (state_ == expect) {
+        state_ = update;
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void MultiTopicsConsumerImpl::shutdown() {}
+
+bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
+
+bool MultiTopicsConsumerImpl::isOpen() {
+    Lock lock(mutex_);
+    return state_ == Ready;
+}
+
+void MultiTopicsConsumerImpl::receiveMessages() {
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        ConsumerImplPtr consumerPtr = consumer->second;
+        consumerPtr->receiveMessages(consumerPtr->getCnx().lock(), 
conf_.getReceiverQueueSize());
+        LOG_DEBUG("Sending FLOW command for consumer - " << 
consumerPtr->getConsumerId());
+    }
+}
+
+Result MultiTopicsConsumerImpl::pauseMessageListener() {
+    if (!messageListener_) {
+        return ResultInvalidConfiguration;
+    }
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        (consumer->second)->pauseMessageListener();
+    }
+    return ResultOk;
+}
+
+Result MultiTopicsConsumerImpl::resumeMessageListener() {
+    if (!messageListener_) {
+        return ResultInvalidConfiguration;
+    }
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        (consumer->second)->resumeMessageListener();
+    }
+    return ResultOk;
+}
+
+void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
+    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned 
consumer.");
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        (consumer->second)->redeliverUnacknowledgedMessages();
+    }
+}
+
+int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return 
messages_.size(); }
+
+void 
MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
 callback) {
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        callback(ResultConsumerNotInitialized, BrokerConsumerStats());
+        return;
+    }
+    MultiTopicsBrokerConsumerStatsPtr statsPtr =
+        
boost::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
+    LatchPtr latchPtr = 
boost::make_shared<Latch>(numberTopicPartitions_->load());
+    int size = consumers_.size();
+    lock.unlock();
+
+    ConsumerMap::const_iterator consumer = consumers_.begin();
+    for (int i = 0; i < size; i++, consumer++) {
+        consumer->second->getBrokerConsumerStatsAsync(
+            boost::bind(&MultiTopicsConsumerImpl::handleGetConsumerStats, 
shared_from_this(), _1, _2,
+                        latchPtr, statsPtr, i, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, 
BrokerConsumerStats brokerConsumerStats,
+                                                     LatchPtr latchPtr,
+                                                     
MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
+                                                     
BrokerConsumerStatsCallback callback) {
+    Lock lock(mutex_);
+    if (res == ResultOk) {
+        latchPtr->countdown();
+        statsPtr->add(brokerConsumerStats, index);
+    } else {
+        lock.unlock();
+        callback(res, BrokerConsumerStats());
+        return;
+    }
+    if (latchPtr->getCount() == 0) {
+        lock.unlock();
+        callback(ResultOk, BrokerConsumerStats(statsPtr));
+    }
+}
+
+boost::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(
+    const std::vector<std::string>& topics) {
+    TopicNamePtr topicNamePtr = boost::shared_ptr<TopicName>();
+    NamespaceNamePtr namespaceNamePtr = boost::shared_ptr<NamespaceName>();
+
+    // all topics name valid, and all topics have same namespace
+    for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != 
topics.end(); itr++) {
+        // topic name valid
+        if (!(topicNamePtr = TopicName::get(*itr))) {
+            LOG_ERROR("Topic name invalid when init " << *itr);
+            return boost::shared_ptr<TopicName>();
+        }
+
+        // all contains same namespace part
+        if (!namespaceNamePtr) {
+            namespaceNamePtr = topicNamePtr->getNamespaceName();
+        } else if (!(*namespaceNamePtr == 
*(topicNamePtr->getNamespaceName()))) {
+            LOG_ERROR("Different namespace name. expected: " << 
namespaceNamePtr->toString() << " now:"
+                                                             << 
topicNamePtr->getNamespaceName()->toString());
+            return boost::shared_ptr<TopicName>();
+        }
+    }
+
+    return topicNamePtr;
+}
+
+void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback 
callback) {
+    callback(ResultOperationNotSupported);
+}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
new file mode 100644
index 0000000000..6425687595
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#include "ConsumerImpl.h"
+#include "ClientImpl.h"
+#include "BlockingQueue.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+#include "boost/enable_shared_from_this.hpp"
+#include "ConsumerImplBase.h"
+#include "lib/UnAckedMessageTrackerDisabled.h"
+#include <lib/Latch.h>
+#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
+#include <lib/TopicName.h>
+#include <lib/NamespaceName.h>
+
+namespace pulsar {
+typedef boost::shared_ptr<Promise<Result, Consumer>> 
ConsumerSubResultPromisePtr;
+typedef boost::function<void(Result result)> ResultCallback;
+
+class MultiTopicsConsumerImpl;
+class MultiTopicsConsumerImpl : public ConsumerImplBase,
+                                public 
boost::enable_shared_from_this<MultiTopicsConsumerImpl> {
+   public:
+    enum MultiTopicsConsumerState
+    {
+        Pending,
+        Ready,
+        Closing,
+        Closed,
+        Failed
+    };
+    MultiTopicsConsumerImpl(ClientImplPtr client, const 
std::vector<std::string>& topics,
+                            const std::string& subscriptionName, TopicNamePtr 
topicName,
+                            const ConsumerConfiguration& conf, const 
LookupServicePtr lookupServicePtr_);
+    virtual ~MultiTopicsConsumerImpl();
+    virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture();
+    virtual const std::string& getSubscriptionName() const;
+    virtual const std::string& getTopic() const;
+    virtual const std::string& getName() const;
+    virtual Result receive(Message& msg);
+    virtual Result receive(Message& msg, int timeout);
+    virtual void unsubscribeAsync(ResultCallback callback);
+    virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback 
callback);
+    virtual void acknowledgeCumulativeAsync(const MessageId& msgId, 
ResultCallback callback);
+    virtual void closeAsync(ResultCallback callback);
+    virtual void start();
+    virtual void shutdown();
+    virtual bool isClosed();
+    virtual bool isOpen();
+    virtual Result pauseMessageListener();
+    virtual Result resumeMessageListener();
+    virtual void redeliverUnacknowledgedMessages();
+    virtual int getNumOfPrefetchedMessages() const;
+    virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback);
+    void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, 
MultiTopicsBrokerConsumerStatsPtr,
+                                size_t, BrokerConsumerStatsCallback);
+    // return first topic name when all topics name valid, or return null 
pointer
+    static boost::shared_ptr<TopicName> topicNamesValid(const 
std::vector<std::string>& topics);
+    void unsubscribeOneTopicAsync(const std::string& topic, ResultCallback 
callback);
+    Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
+    // not supported
+    virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
+
+   private:
+    const ClientImplPtr client_;
+    const std::string subscriptionName_;
+    std::string consumerStr_;
+    std::string topic_;
+    NamespaceNamePtr namespaceName_;
+    const ConsumerConfiguration conf_;
+    typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
+    ConsumerMap consumers_;
+    std::map<std::string, int> topicsPartitions_;
+    boost::mutex mutex_;
+    MultiTopicsConsumerState state_;
+    boost::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+    LookupServicePtr lookupServicePtr_;
+    BlockingQueue<Message> messages_;
+    ExecutorServicePtr listenerExecutor_;
+    MessageListener messageListener_;
+    Promise<Result, ConsumerImplBaseWeakPtr> 
multiTopicsConsumerCreatedPromise_;
+    UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+    const std::vector<std::string>& topics_;
+
+    /* methods */
+    void setState(MultiTopicsConsumerState state);
+    bool compareAndSetState(MultiTopicsConsumerState expect, 
MultiTopicsConsumerState update);
+
+    void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+                                              unsigned int partitionIndex);
+    void handleSingleConsumerClose(Result result, std::string& 
topicPartitionName, CloseCallback callback);
+    void notifyResult(CloseCallback closeCallback);
+    void messageReceived(Consumer consumer, const Message& msg);
+    void internalListener(Consumer consumer);
+    void receiveMessages();
+
+    void handleOneTopicSubscribed(Result result, Consumer consumer, const 
std::string& topic,
+                                  boost::shared_ptr<std::atomic<int>> 
topicsNeedCreate);
+    void subscribeTopicPartitions(const Result result, const 
LookupDataResultPtr partitionMetadata,
+                                  TopicNamePtr topicName, const std::string& 
consumerName,
+                                  ConsumerConfiguration conf,
+                                  ConsumerSubResultPromisePtr 
topicSubResultPromise);
+    void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr 
consumerImplBaseWeakPtr,
+                                     boost::shared_ptr<std::atomic<int>> 
partitionsNeedCreate,
+                                     ConsumerSubResultPromisePtr 
topicSubResultPromise);
+    void handleUnsubscribedAsync(Result result, 
boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+                                 ResultCallback callback);
+    void handleOneTopicUnsubscribedAsync(Result result, 
boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+                                         int numberPartitions, TopicNamePtr 
topicNamePtr,
+                                         std::string& topicPartitionName, 
ResultCallback callback);
+};
+
+}  // namespace pulsar
+#endif  // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/lib/NamespaceName.cc 
b/pulsar-client-cpp/lib/NamespaceName.cc
index caa5b79409..273fc2234a 100644
--- a/pulsar-client-cpp/lib/NamespaceName.cc
+++ b/pulsar-client-cpp/lib/NamespaceName.cc
@@ -27,6 +27,7 @@
 #include <sstream>
 
 DECLARE_LOG_OBJECT()
+namespace pulsar {
 
 boost::shared_ptr<NamespaceName> NamespaceName::get(const std::string& 
property, const std::string& cluster,
                                                     const std::string& 
namespaceName) {
@@ -103,3 +104,7 @@ std::string NamespaceName::getCluster() { return 
this->cluster_; }
 std::string NamespaceName::getLocalName() { return this->localName_; }
 
 bool NamespaceName::isV2() { return this->cluster_.empty(); }
+
+std::string NamespaceName::toString() { return this->namespace_; }
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NamespaceName.h 
b/pulsar-client-cpp/lib/NamespaceName.h
index 1f121bb9bc..d5bdf99969 100644
--- a/pulsar-client-cpp/lib/NamespaceName.h
+++ b/pulsar-client-cpp/lib/NamespaceName.h
@@ -25,6 +25,7 @@
 #include <boost/shared_ptr.hpp>
 
 #pragma GCC visibility push(default)
+namespace pulsar {
 
 class NamespaceName : public ServiceUnitId {
    public:
@@ -38,6 +39,7 @@ class NamespaceName : public ServiceUnitId {
                                                 const std::string& 
namespaceName);
     bool operator==(const NamespaceName& namespaceName);
     bool isV2();
+    std::string toString();
 
    private:
     std::string namespace_;
@@ -51,6 +53,9 @@ class NamespaceName : public ServiceUnitId {
     NamespaceName(const std::string& property, const std::string& namespace_);
 };
 
+typedef boost::shared_ptr<NamespaceName> NamespaceNamePtr;
+
+}  // namespace pulsar
 #pragma GCC visibility pop
 
 #endif
diff --git a/pulsar-client-cpp/lib/TopicName.cc 
b/pulsar-client-cpp/lib/TopicName.cc
index 533f91eaee..91867521b1 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -228,4 +228,7 @@ const std::string TopicName::getTopicPartitionName(unsigned 
int partition) {
     topicPartitionName << toString() << 
PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
     return topicPartitionName.str();
 }
+
+NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/TopicName.h 
b/pulsar-client-cpp/lib/TopicName.h
index 91b7994364..1949d9429d 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -51,6 +51,7 @@ class TopicName : public ServiceUnitId {
     std::string getLocalName();
     std::string getEncodedLocalName();
     std::string toString();
+    NamespaceNamePtr getNamespaceName();
     static boost::shared_ptr<TopicName> get(const std::string& topicName);
     bool operator==(const TopicName& other);
     static std::string getEncodedName(const std::string& nameBeforeEncoding);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
index 62cf86e5a5..c25c1a5b9d 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
@@ -26,6 +26,7 @@ class UnAckedMessageTrackerDisabled : public 
UnAckedMessageTrackerInterface {
     bool add(const MessageId& m) { return false; }
     bool remove(const MessageId& m) { return false; }
     void removeMessagesTill(const MessageId& msgId) {}
+    void removeTopicMessage(const std::string& topic) {}
 
     void clear() {}
 };
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 90006b6215..ba9fc97222 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -100,6 +100,26 @@ void 
UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     }
 }
 
+// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, 
should remove all it's message.
+void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& 
topic) {
+    for (std::set<MessageId>::iterator it = oldSet_.begin(); it != 
oldSet_.end();) {
+        const std::string& topicPartitionName = it->getTopicName();
+        if (topicPartitionName.find(topic) != std::string::npos) {
+            oldSet_.erase(it++);
+        } else {
+            it++;
+        }
+    }
+    for (std::set<MessageId>::iterator it = currentSet_.begin(); it != 
currentSet_.end();) {
+        const std::string& topicPartitionName = it->getTopicName();
+        if (topicPartitionName.find(topic) != std::string::npos) {
+            currentSet_.erase(it++);
+        } else {
+            it++;
+        }
+    }
+}
+
 void UnAckedMessageTrackerEnabled::clear() {
     currentSet_.clear();
     oldSet_.clear();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 217ee0b62b..7bea00d120 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public 
UnAckedMessageTrackerInterface {
     bool add(const MessageId& m);
     bool remove(const MessageId& m);
     void removeMessagesTill(const MessageId& msgId);
+    void removeTopicMessage(const std::string& topic);
     void timeoutHandler();
 
     void clear();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index d010dd03ea..798ccb429f 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -44,6 +44,9 @@ class UnAckedMessageTrackerInterface {
     virtual bool remove(const MessageId& m) = 0;
     virtual void removeMessagesTill(const MessageId& msgId) = 0;
     virtual void clear() = 0;
+    // this is only for MultiTopicsConsumerImpl, when un-subscribe a single 
topic, should remove all it's
+    // message.
+    virtual void removeTopicMessage(const std::string& topic) = 0;
 };
 
 typedef boost::scoped_ptr<UnAckedMessageTrackerInterface> 
UnAckedMessageTrackerScopedPtr;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 023a58769d..cf28b34130 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -33,6 +33,7 @@
 #include "HttpHelper.h"
 #include <set>
 #include <vector>
+#include <lib/MultiTopicsConsumerImpl.h>
 #include "lib/Future.h"
 #include "lib/Utils.h"
 DECLARE_LOG_OBJECT()
@@ -45,7 +46,6 @@ static int globalCount = 0;
 static long globalResendMessageCount = 0;
 static std::string lookupUrl = "pulsar://localhost:8885";
 static std::string adminUrl = "http://localhost:8765/";;
-
 static void messageListenerFunction(Consumer consumer, const Message& msg) {
     globalCount++;
     consumer.acknowledge(msg);
@@ -1508,3 +1508,174 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTimeoutListener) {
     producer.close();
     client.close();
 }
+
+TEST(BasicEndToEndTest, testMultiTopicsConsumerTopicNameInvalid) {
+    Client client(lookupUrl);
+    std::vector<std::string> topicNames;
+    topicNames.reserve(3);
+    std::string subName = "testMultiTopicsTopicNameInvalid";
+    // cluster empty
+    std::string topicName1 = 
"persistent://prop/testMultiTopicsTopicNameInvalid";
+
+    // empty topics
+    ASSERT_EQ(0, topicNames.size());
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicNames, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    LOG_INFO("subscribe on empty topics");
+    consumer.close();
+
+    // Invalid topic names
+    Consumer consumer1;
+    std::string subName1 = "testMultiTopicsTopicNameInvalid";
+    topicNames.push_back(topicName1);
+    Promise<Result, Consumer> consumerPromise1;
+    client.subscribeAsync(topicNames, subName1, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise1));
+    Future<Result, Consumer> consumerFuture1 = consumerPromise1.getFuture();
+    result = consumerFuture1.get(consumer1);
+    ASSERT_EQ(ResultInvalidTopicName, result);
+    LOG_INFO("subscribe on TopicName1 failed");
+    consumer1.close();
+
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) {
+    Client client(lookupUrl);
+    std::vector<std::string> topicNames;
+    topicNames.reserve(3);
+    std::string subName = "testMultiTopicsDifferentNamespace";
+    std::string topicName1 = 
"persistent://prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1";
+    std::string topicName2 = 
"persistent://prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2";
+    std::string topicName3 = 
"persistent://prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3";
+
+    topicNames.push_back(topicName1);
+    topicNames.push_back(topicName2);
+    topicNames.push_back(topicName3);
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1/partitions";
+    std::string url2 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2/partitions";
+    std::string url3 =
+        adminUrl + 
"admin/persistent/prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    // empty topics
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicNames, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultInvalidTopicName, result);
+    LOG_INFO("subscribe on topics with different names should fail");
+    consumer.close();
+
+    client.shutdown();
+}
+
+// Test subscribe 3 topics using MultiTopicsConsumer
+TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
+    Client client(lookupUrl);
+    std::vector<std::string> topicNames;
+    topicNames.reserve(3);
+    std::string subName = "testMultiTopicsConsumer";
+    std::string topicName1 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer1";
+    std::string topicName2 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer2";
+    std::string topicName3 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer3";
+
+    topicNames.push_back(topicName1);
+    topicNames.push_back(topicName2);
+    topicNames.push_back(topicName3);
+
+    // call admin api to make topics partitioned
+    std::string url1 = adminUrl + 
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions";
+    std::string url2 = adminUrl + 
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer2/partitions";
+    std::string url3 = adminUrl + 
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    Result result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 3 producers");
+
+    int messageNumber = 100;
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);  // size for each sub-consumer
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicNames, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created topics consumer on 3 topics");
+
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+
+    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
diff --git a/pulsar-client-cpp/tests/NamespaceNameTest.cc 
b/pulsar-client-cpp/tests/NamespaceNameTest.cc
index 84ccb13eb5..5c214e23b0 100644
--- a/pulsar-client-cpp/tests/NamespaceNameTest.cc
+++ b/pulsar-client-cpp/tests/NamespaceNameTest.cc
@@ -19,6 +19,7 @@
 #include <NamespaceName.h>
 
 #include <gtest/gtest.h>
+using namespace pulsar;
 
 TEST(NamespaceNameTest, testNamespaceName) {
     boost::shared_ptr<NamespaceName> nn1 = NamespaceName::get("property", 
"cluster", "namespace");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to