This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 500e393 Cpp client: add multiTopicsConsumer (#1996)
500e393 is described below
commit 500e3936d520b1854a096e4ef71be7cb7edfc502
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Jul 24 05:25:45 2018 +0800
Cpp client: add multiTopicsConsumer (#1996)
In PR #1103, we add Multi-Topics-Consumer in java client. This is a catch
up work to add it in cpp client.
---
pulsar-client-cpp/include/pulsar/Client.h | 9 +
pulsar-client-cpp/include/pulsar/Consumer.h | 1 +
pulsar-client-cpp/include/pulsar/Message.h | 1 +
pulsar-client-cpp/include/pulsar/MessageId.h | 12 +
pulsar-client-cpp/lib/Client.cc | 24 +
pulsar-client-cpp/lib/ClientImpl.cc | 36 ++
pulsar-client-cpp/lib/ClientImpl.h | 3 +
pulsar-client-cpp/lib/MessageId.cc | 4 +
pulsar-client-cpp/lib/MessageIdImpl.h | 17 +-
pulsar-client-cpp/lib/MessageImpl.cc | 10 +-
pulsar-client-cpp/lib/MessageImpl.h | 11 +
.../lib/MultiTopicsBrokerConsumerStatsImpl.cc | 158 +++++
.../lib/MultiTopicsBrokerConsumerStatsImpl.h | 92 +++
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 644 +++++++++++++++++++++
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 133 +++++
pulsar-client-cpp/lib/NamespaceName.cc | 5 +
pulsar-client-cpp/lib/NamespaceName.h | 5 +
pulsar-client-cpp/lib/TopicName.cc | 3 +
pulsar-client-cpp/lib/TopicName.h | 1 +
.../lib/UnAckedMessageTrackerDisabled.h | 1 +
.../lib/UnAckedMessageTrackerEnabled.cc | 20 +
.../lib/UnAckedMessageTrackerEnabled.h | 1 +
.../lib/UnAckedMessageTrackerInterface.h | 3 +
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 173 +++++-
pulsar-client-cpp/tests/NamespaceNameTest.cc | 1 +
25 files changed, 1364 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Client.h
b/pulsar-client-cpp/include/pulsar/Client.h
index 07b4355..6a9e487 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,15 @@ class Client {
void subscribeAsync(const std::string& topic, const std::string&
consumerName,
const ConsumerConfiguration& conf, SubscribeCallback
callback);
+ Result subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ Consumer& consumer);
+ Result subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf, Consumer& consumer);
+ void subscribeAsync(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ SubscribeCallback callback);
+ void subscribeAsync(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf, SubscribeCallback
callback);
+
/**
* Create a topic reader with given {@code ReaderConfiguration} for
reading messages from the specified
* topic.
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h
b/pulsar-client-cpp/include/pulsar/Consumer.h
index 4272166..4486515 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -243,6 +243,7 @@ class Consumer {
friend class PulsarFriend;
friend class PulsarWrapper;
friend class PartitionedConsumerImpl;
+ friend class MultiTopicsConsumerImpl;
friend class ConsumerImpl;
friend class ClientImpl;
friend class ConsumerTest;
diff --git a/pulsar-client-cpp/include/pulsar/Message.h
b/pulsar-client-cpp/include/pulsar/Message.h
index aff0d94..a3b9af0 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -134,6 +134,7 @@ class Message {
proto::SingleMessageMetadata& singleMetadata);
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
+ friend class MultiTopicsConsumerImpl;
friend class MessageBuilder;
friend class ConsumerImpl;
friend class ProducerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h
b/pulsar-client-cpp/include/pulsar/MessageId.h
index dfe3a51..e9ff133 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -22,6 +22,7 @@
#include <iosfwd>
#include <stdint.h>
#include <boost/shared_ptr.hpp>
+//#include <lib/MessageIdImpl.h>
#pragma GCC visibility push(default)
@@ -51,6 +52,16 @@ class MessageId {
void serialize(std::string& result) const;
/**
+ * Get the topic Name
+ */
+ const std::string& getTopicName() const;
+
+ /**
+ * Set the topicName
+ */
+ void setTopicName(const std::string& topicName);
+
+ /**
* Deserialize a message id from a binary string
*/
static MessageId deserialize(const std::string& serializedMessageId);
@@ -71,6 +82,7 @@ class MessageId {
friend class Commands;
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
+ friend class MultiTopicsConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index 5936e48..bba3520 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -90,6 +90,30 @@ void Client::subscribeAsync(const std::string& topic, const
std::string& consume
impl_->subscribeAsync(topic, consumerName, conf, callback);
}
+Result Client::subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ Consumer& consumer) {
+ return subscribe(topics, subscriptionName, ConsumerConfiguration(),
consumer);
+}
+
+Result Client::subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf, Consumer&
consumer) {
+ Promise<Result, Consumer> promise;
+ subscribeAsync(topics, subscriptionName, conf,
WaitForCallbackValue<Consumer>(promise));
+ Future<Result, Consumer> future = promise.getFuture();
+
+ return future.get(consumer);
+}
+
+void Client::subscribeAsync(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ SubscribeCallback callback) {
+ subscribeAsync(topics, subscriptionName, ConsumerConfiguration(),
callback);
+}
+
+void Client::subscribeAsync(const std::vector<std::string>& topics, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback) {
+ impl_->subscribeAsync(topics, subscriptionName, conf, callback);
+}
+
Result Client::createReader(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, Reader& reader) {
Promise<Result, Reader> promise;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index 1d46cd9..3768926 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -24,6 +24,7 @@
#include "ReaderImpl.h"
#include "PartitionedProducerImpl.h"
#include "PartitionedConsumerImpl.h"
+#include "MultiTopicsConsumerImpl.h"
#include "SimpleLoggerImpl.h"
#include "Log4CxxLogger.h"
#include <boost/bind.hpp>
@@ -33,6 +34,7 @@
#include "boost/date_time/posix_time/posix_time.hpp"
#include <lib/HTTPLookupService.h>
#include <lib/TopicName.h>
+#include <algorithm>
DECLARE_LOG_OBJECT()
@@ -210,6 +212,40 @@ void ClientImpl::handleReaderMetadataLookup(const Result
result, const LookupDat
consumers_.push_back(reader->getConsumer());
}
+void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const
std::string& consumerName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback) {
+ TopicNamePtr topicNamePtr;
+
+ Lock lock(mutex_);
+ if (state_ != Open) {
+ lock.unlock();
+ callback(ResultAlreadyClosed, Consumer());
+ return;
+ } else {
+ if (!topics.empty() && !(topicNamePtr =
MultiTopicsConsumerImpl::topicNamesValid(topics))) {
+ lock.unlock();
+ callback(ResultInvalidTopicName, Consumer());
+ return;
+ }
+ }
+
+ if (topicNamePtr) {
+ std::string randomName = generateRandomName();
+ std::stringstream consumerTopicNameStream;
+ consumerTopicNameStream << topicNamePtr->toString() <<
"-TopicsConsumerFakeName-" << randomName;
+ topicNamePtr = TopicName::get(consumerTopicNameStream.str());
+ }
+
+ ConsumerImplBasePtr consumer = boost::make_shared<MultiTopicsConsumerImpl>(
+ shared_from_this(), topics, consumerName, topicNamePtr, conf,
lookupServicePtr_);
+
+ consumer->getConsumerCreatedFuture().addListener(
+ boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(),
_1, _2, callback, consumer));
+ consumers_.push_back(consumer);
+ lock.unlock();
+ consumer->start();
+}
+
void ClientImpl::subscribeAsync(const std::string& topic, const std::string&
consumerName,
const ConsumerConfiguration& conf,
SubscribeCallback callback) {
TopicNamePtr topicName;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h
b/pulsar-client-cpp/lib/ClientImpl.h
index 5283b58..550298b 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -57,6 +57,9 @@ class ClientImpl : public
boost::enable_shared_from_this<ClientImpl> {
void subscribeAsync(const std::string& topic, const std::string&
consumerName,
const ConsumerConfiguration& conf, SubscribeCallback
callback);
+ void subscribeAsync(const std::vector<std::string>& topics, const
std::string& consumerName,
+ const ConsumerConfiguration& conf, SubscribeCallback
callback);
+
void createReaderAsync(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, ReaderCallback
callback);
diff --git a/pulsar-client-cpp/lib/MessageId.cc
b/pulsar-client-cpp/lib/MessageId.cc
index c5314d8..53946f8 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -130,5 +130,9 @@ bool MessageId::operator==(const MessageId& other) const {
bool MessageId::operator!=(const MessageId& other) const { return !(*this ==
other); }
+const std::string& MessageId::getTopicName() const { return
impl_->getTopicName(); }
+
+void MessageId::setTopicName(const std::string& topicName) { return
impl_->setTopicName(topicName); }
+
#pragma GCC visibility pop
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageIdImpl.h
b/pulsar-client-cpp/lib/MessageIdImpl.h
index a3fc171..ae33da4 100644
--- a/pulsar-client-cpp/lib/MessageIdImpl.h
+++ b/pulsar-client-cpp/lib/MessageIdImpl.h
@@ -25,12 +25,25 @@ namespace pulsar {
class MessageIdImpl {
public:
- MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1),
batchIndex_(-1) {}
+ MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1),
batchIndex_(-1), topicName_() {}
MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId,
int32_t batchIndex)
- : ledgerId_(ledgerId), entryId_(entryId), partition_(partition),
batchIndex_(batchIndex) {}
+ : ledgerId_(ledgerId),
+ entryId_(entryId),
+ partition_(partition),
+ batchIndex_(batchIndex),
+ topicName_() {}
const int64_t ledgerId_;
const int64_t entryId_;
const int32_t partition_;
const int32_t batchIndex_;
+
+ const std::string& getTopicName() { return *topicName_; }
+ void setTopicName(const std::string& topicName) { topicName_ = &topicName;
}
+
+ private:
+ const std::string* topicName_;
+ friend class MessageImpl;
+ friend class MultiTopicsConsumerImpl;
+ friend class UnAckedMessageTrackerEnabled;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc
b/pulsar-client-cpp/lib/MessageImpl.cc
index 569a30a..9b59eff 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -20,7 +20,7 @@
namespace pulsar {
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0) {}
+MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0),
topicName_() {}
const Message::StringMap& MessageImpl::properties() {
if (properties_.size() == 0) {
@@ -78,4 +78,12 @@ void MessageImpl::setPartitionKey(const std::string&
partitionKey) {
}
void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) {
metadata.set_event_time(eventTimestamp); }
+
+void MessageImpl::setTopicName(const std::string& topicName) {
+ topicName_ = &topicName;
+ messageId.setTopicName(topicName);
+}
+
+const std::string& MessageImpl::getTopicName() { return *topicName_; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h
b/pulsar-client-cpp/lib/MessageImpl.h
index f3753b1..0ef63e8 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -43,6 +43,7 @@ class MessageImpl {
SharedBuffer payload;
MessageId messageId;
ClientConnection* cnx_;
+ const std::string* topicName_;
const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
@@ -50,6 +51,16 @@ class MessageImpl {
uint64_t getPublishTimestamp() const;
uint64_t getEventTimestamp() const;
+ /**
+ * Get a valid topicName
+ */
+ const std::string& getTopicName();
+
+ /**
+ * Set a valid topicName
+ */
+ void setTopicName(const std::string& topicName);
+
friend class PulsarWrapper;
friend class MessageBuilder;
diff --git a/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
new file mode 100644
index 0000000..5220307
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
+#include <boost/date_time/local_time/local_time.hpp>
+#include <algorithm>
+#include <numeric>
+
+using namespace pulsar;
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::DELIMITER = ";";
+
+MultiTopicsBrokerConsumerStatsImpl::MultiTopicsBrokerConsumerStatsImpl(size_t
size) {
+ statsList_.resize(size);
+}
+
+bool MultiTopicsBrokerConsumerStatsImpl::isValid() const {
+ bool isValid = true;
+ for (int i = 0; i < statsList_.size(); i++) {
+ isValid = isValid && statsList_[i].isValid();
+ }
+ return isValid;
+}
+
+std::ostream& operator<<(std::ostream& os, const
MultiTopicsBrokerConsumerStatsImpl& obj) {
+ os << "\nMultiTopicsBrokerConsumerStatsImpl ["
+ << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " <<
obj.getMsgRateOut()
+ << ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
+ << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
+ << ", consumerName_ = " << obj.getConsumerName()
+ << ", availablePermits_ = " << obj.getAvailablePermits()
+ << ", unackedMessages_ = " << obj.getUnackedMessages()
+ << ", blockedConsumerOnUnackedMsgs_ = " <<
obj.isBlockedConsumerOnUnackedMsgs()
+ << ", address_ = " << obj.getAddress() << ", connectedSince_ = " <<
obj.getConnectedSince()
+ << ", type_ = " << obj.getType() << ", msgRateExpired_ = " <<
obj.getMsgRateExpired()
+ << ", msgBacklog_ = " << obj.getMsgBacklog() << "]";
+ return os;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateOut() const {
+ double sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getMsgRateOut();
+ }
+ return sum;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgThroughputOut() const {
+ double sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getMsgThroughputOut();
+ }
+ return sum;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateRedeliver() const {
+ double sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getMsgRateRedeliver();
+ }
+ return sum;
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getConsumerName() const {
+ std::string str;
+ for (int i = 0; i < statsList_.size(); i++) {
+ str += statsList_[i].getConsumerName() + DELIMITER;
+ }
+ return str;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getAvailablePermits() const {
+ uint64_t sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getAvailablePermits();
+ }
+ return sum;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getUnackedMessages() const {
+ uint64_t sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getUnackedMessages();
+ }
+ return sum;
+}
+
+bool MultiTopicsBrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs()
const {
+ if (statsList_.size() == 0) {
+ return false;
+ }
+
+ return isValid();
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getAddress() const {
+ std::stringstream str;
+ for (int i = 0; i < statsList_.size(); i++) {
+ str << statsList_[i].getAddress() << DELIMITER;
+ }
+ return str.str();
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getConnectedSince()
const {
+ std::stringstream str;
+ for (int i = 0; i < statsList_.size(); i++) {
+ str << statsList_[i].getConnectedSince() << DELIMITER;
+ }
+ return str.str();
+}
+
+const ConsumerType MultiTopicsBrokerConsumerStatsImpl::getType() const {
+ if (!statsList_.size()) {
+ return ConsumerExclusive;
+ }
+ return statsList_[0].getType();
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateExpired() const {
+ double sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getMsgRateExpired();
+ }
+ return sum;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getMsgBacklog() const {
+ uint64_t sum = 0;
+ for (int i = 0; i < statsList_.size(); i++) {
+ sum += statsList_[i].getMsgBacklog();
+ }
+ return sum;
+}
+
+BrokerConsumerStats
MultiTopicsBrokerConsumerStatsImpl::getBrokerConsumerStats(int index) {
+ return statsList_[index];
+}
+
+void MultiTopicsBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int
index) {
+ statsList_[index] = stats;
+}
+
+void MultiTopicsBrokerConsumerStatsImpl::clear() { statsList_.clear(); }
diff --git a/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
new file mode 100644
index 0000000..568cda1
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
+#define PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
+
+#include <string.h>
+#include <iostream>
+#include <vector>
+#include <pulsar/Result.h>
+#include <boost/function.hpp>
+#include <boost/date_time/microsec_time_clock.hpp>
+#include <lib/BrokerConsumerStatsImpl.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
+#pragma GCC visibility push(default)
+namespace pulsar {
+class MultiTopicsBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
+ private:
+ std::vector<BrokerConsumerStats> statsList_;
+ static const std::string DELIMITER;
+
+ public:
+ MultiTopicsBrokerConsumerStatsImpl(size_t size);
+
+ /** Returns true if the Stats are still valid **/
+ virtual bool isValid() const;
+
+ /** Returns the rate of messages delivered to the consumer. msg/s */
+ virtual double getMsgRateOut() const;
+
+ /** Returns the throughput delivered to the consumer. bytes/s */
+ virtual double getMsgThroughputOut() const;
+
+ /** Returns the rate of messages redelivered by this consumer. msg/s */
+ virtual double getMsgRateRedeliver() const;
+
+ /** Returns the Name of the consumer */
+ virtual const std::string getConsumerName() const;
+
+ /** Returns the Number of available message permits for the consumer */
+ virtual uint64_t getAvailablePermits() const;
+
+ /** Returns the Number of unacknowledged messages for the consumer */
+ virtual uint64_t getUnackedMessages() const;
+
+ /** Returns true if the consumer is blocked due to unacked messages. */
+ virtual bool isBlockedConsumerOnUnackedMsgs() const;
+
+ /** Returns the Address of this consumer */
+ virtual const std::string getAddress() const;
+
+ /** Returns the Timestamp of connection */
+ virtual const std::string getConnectedSince() const;
+
+ /** Returns Whether this subscription is Exclusive or Shared or Failover */
+ virtual const ConsumerType getType() const;
+
+ /** Returns the rate of messages expired on this subscription. msg/s */
+ virtual double getMsgRateExpired() const;
+
+ /** Returns the Number of messages in the subscription backlog */
+ virtual uint64_t getMsgBacklog() const;
+
+ /** Returns the BrokerConsumerStatsImpl at of ith partition */
+ BrokerConsumerStats getBrokerConsumerStats(int index);
+
+ void add(BrokerConsumerStats stats, int index);
+
+ void clear();
+
+ friend std::ostream &operator<<(std::ostream &os, const
MultiTopicsBrokerConsumerStatsImpl &obj);
+};
+typedef boost::shared_ptr<MultiTopicsBrokerConsumerStatsImpl>
MultiTopicsBrokerConsumerStatsPtr;
+} // namespace pulsar
+#pragma GCC visibility pop
+#endif // PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
new file mode 100644
index 0000000..7be197c
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MultiTopicsConsumerImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const
std::vector<std::string>& topics,
+ const std::string&
subscriptionName, TopicNamePtr topicName,
+ const ConsumerConfiguration&
conf,
+ const LookupServicePtr
lookupServicePtr)
+ : client_(client),
+ subscriptionName_(subscriptionName),
+ topic_(topicName ? topicName->toString() : "EmptyTopics"),
+ conf_(conf),
+ state_(Pending),
+ messages_(1000),
+ listenerExecutor_(client->getListenerExecutorProvider()->get()),
+ messageListener_(conf.getMessageListener()),
+ namespaceName_(topicName ? topicName->getNamespaceName() :
boost::shared_ptr<NamespaceName>()),
+ lookupServicePtr_(lookupServicePtr),
+ numberTopicPartitions_(boost::make_shared<std::atomic<int>>(0)),
+ topics_(topics) {
+ std::stringstream consumerStrStream;
+ consumerStrStream << "[Muti Topics Consumer: "
+ << "TopicName - " << topic_ << " - Subscription - " <<
subscriptionName << "]";
+ consumerStr_ = consumerStrStream.str();
+
+ if (conf.getUnAckedMessagesTimeoutMs() != 0) {
+ unAckedMessageTrackerPtr_.reset(
+ new
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client,
*this));
+ } else {
+ unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
+ }
+}
+
+void MultiTopicsConsumerImpl::start() {
+ if (topics_.empty()) {
+ if (compareAndSetState(Pending, Ready)) {
+ LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
+ multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+ return;
+ } else {
+ LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " <<
state_);
+ multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+ return;
+ }
+ }
+
+ // start call subscribeOneTopicAsync for each single topic
+ int topicsNumber = topics_.size();
+ boost::shared_ptr<std::atomic<int>> topicsNeedCreate =
boost::make_shared<std::atomic<int>>(topicsNumber);
+ // subscribe for each passed in topic
+ for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr
!= topics_.end(); itr++) {
+ subscribeOneTopicAsync(*itr).addListener(
+ boost::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
shared_from_this(), _1, _2, *itr,
+ topicsNeedCreate));
+ }
+}
+
+void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer
consumer,
+ const std::string&
topic,
+
boost::shared_ptr<std::atomic<int>> topicsNeedCreate) {
+ int previous = topicsNeedCreate->fetch_sub(1);
+ assert(previous > 0);
+
+ if (result != ResultOk) {
+ setState(Failed);
+ LOG_ERROR("Failed when subscribed to topic " << topic << " in
TopicsConsumer. Error - " << result);
+ }
+
+ LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
+
+ if (topicsNeedCreate->load() == 0) {
+ if (compareAndSetState(Pending, Ready)) {
+ LOG_INFO("Successfully Subscribed to Topics");
+ if (!namespaceName_) {
+ namespaceName_ = TopicName::get(topic)->getNamespaceName();
+ }
+ multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+ } else {
+ LOG_ERROR("Unable to create Consumer - " << consumerStr_ << "
Error - " << result);
+ // unsubscribed all of the successfully subscribed partitioned
consumers
+ ResultCallback nullCallbackForCleanup = NULL;
+ closeAsync(nullCallbackForCleanup);
+ multiTopicsConsumerCreatedPromise_.setFailed(result);
+ return;
+ }
+ return;
+ }
+}
+
+// subscribe for passed in topic
+Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const
std::string& topic) {
+ TopicNamePtr topicName;
+ ConsumerSubResultPromisePtr topicPromise =
boost::make_shared<Promise<Result, Consumer>>();
+ if (!(topicName = TopicName::get(topic))) {
+ LOG_ERROR("TopicName invalid: " << topic);
+ topicPromise->setFailed(ResultInvalidTopicName);
+ return topicPromise->getFuture();
+ }
+
+ if (namespaceName_ && !(*namespaceName_ ==
*(topicName->getNamespaceName()))) {
+ LOG_ERROR("TopicName namespace not the same with topicsConsumer.
wanted namespace: "
+ << namespaceName_->toString() << " this topic: " << topic);
+ topicPromise->setFailed(ResultInvalidTopicName);
+ return topicPromise->getFuture();
+ }
+
+ if (state_ == Closed || state_ == Closing) {
+ LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
+ topicPromise->setFailed(ResultAlreadyClosed);
+ return topicPromise->getFuture();
+ }
+
+ // subscribe for each partition, when all partitions completed, complete
promise
+ lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ boost::bind(&MultiTopicsConsumerImpl::subscribeTopicPartitions,
shared_from_this(), _1, _2, topicName,
+ subscriptionName_, conf_, topicPromise));
+ return topicPromise->getFuture();
+}
+
+void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
+ const
LookupDataResultPtr partitionMetadata,
+ TopicNamePtr topicName,
+ const std::string&
consumerName,
+ ConsumerConfiguration
conf,
+
ConsumerSubResultPromisePtr topicSubResultPromise) {
+ if (result != ResultOk) {
+ LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics
Subscribing- "
+ << consumerStr_ << " result: " << result)
+ topicSubResultPromise->setFailed(result);
+ return;
+ }
+
+ boost::shared_ptr<ConsumerImpl> consumer;
+ ConsumerConfiguration config;
+ ExecutorServicePtr internalListenerExecutor =
client_->getPartitionListenerExecutorProvider()->get();
+
+ // all the consumers should have same name.
+ config.setConsumerName(conf_.getConsumerName());
+ config.setConsumerType(conf_.getConsumerType());
+
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
+ config.setMessageListener(
+ boost::bind(&MultiTopicsConsumerImpl::messageReceived,
shared_from_this(), _1, _2));
+ config.setReceiverQueueSize(conf_.getReceiverQueueSize());
+
+ int numPartitions = partitionMetadata->getPartitions() >= 1 ?
partitionMetadata->getPartitions() : 1;
+
+ Lock lock(mutex_);
+ topicsPartitions_.insert(std::make_pair(topicName->toString(),
numPartitions));
+ lock.unlock();
+ numberTopicPartitions_->fetch_add(numPartitions);
+
+ boost::shared_ptr<std::atomic<int>> partitionsNeedCreate =
+ boost::make_shared<std::atomic<int>>(numPartitions);
+
+ for (int i = 0; i < numPartitions; i++) {
+ std::string topicPartitionName = topicName->getTopicPartitionName(i);
+ consumer = boost::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_, config,
+ internalListenerExecutor,
Partitioned);
+ consumer->getConsumerCreatedFuture().addListener(
+ boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(), _1, _2,
+ partitionsNeedCreate, topicSubResultPromise));
+ consumer->setPartitionIndex(i);
+ consumers_.insert(std::make_pair(topicPartitionName, consumer));
+ LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " <<
consumerStr_);
+ consumer->start();
+ }
+}
+
+void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
+ Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+ boost::shared_ptr<std::atomic<int>> partitionsNeedCreate,
+ ConsumerSubResultPromisePtr topicSubResultPromise) {
+ if (state_ == Failed) {
+ // one of the consumer creation failed, and we are cleaning up
+ topicSubResultPromise->setFailed(ResultAlreadyClosed);
+ LOG_ERROR("Unable to create Consumer " << consumerStr_ << " state ==
Failed, result: " << result);
+ return;
+ }
+
+ int previous = partitionsNeedCreate->fetch_sub(1);
+ assert(previous > 0);
+
+ if (result != ResultOk) {
+ topicSubResultPromise->setFailed(result);
+ LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error -
" << result);
+ return;
+ }
+
+ LOG_DEBUG("Successfully Subscribed to a single partition of topic in
TopicsConsumer. "
+ << "Partitions need to create - " << previous - 1);
+
+ if (partitionsNeedCreate->load() == 0) {
+ topicSubResultPromise->setValue(Consumer(shared_from_this()));
+ }
+}
+
+void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+ LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "]
Unsubscribing");
+
+ Lock lock(mutex_);
+ if (state_ == Closing || state_ == Closed) {
+ LOG_INFO(consumerStr_ << " already closed");
+ lock.unlock();
+ callback(ResultAlreadyClosed);
+ return;
+ }
+ state_ = Closing;
+ lock.unlock();
+
+ boost::shared_ptr<std::atomic<int>> consumerUnsubed =
boost::make_shared<std::atomic<int>>(0);
+
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer
!= consumers_.end();
+ consumer++) {
+ LOG_DEBUG("Unsubcribing Consumer - " << consumer->first);
+ (consumer->second)
+
->unsubscribeAsync(boost::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync,
+ shared_from_this(), _1,
consumerUnsubed, callback));
+ }
+}
+
+void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
+
boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+ ResultCallback callback)
{
+ int previous = consumerUnsubed->fetch_add(1);
+ assert(previous < numberTopicPartitions_->load());
+
+ if (result != ResultOk) {
+ setState(Failed);
+ LOG_ERROR("Error Closing one of the consumers in TopicsConsumer,
result: "
+ << result << " subscription - " << subscriptionName_);
+ }
+
+ if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
+ LOG_DEBUG("Unsubscribed all of the partition consumer for
TopicsConsumer. - " << consumerStr_);
+ consumers_.clear();
+ topicsPartitions_.clear();
+ unAckedMessageTrackerPtr_->clear();
+
+ Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
+ setState(Closed);
+ callback(result1);
+ return;
+ }
+}
+
+void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string&
topic, ResultCallback callback) {
+ std::map<std::string, int>::iterator it = topicsPartitions_.find(topic);
+ if (it == topicsPartitions_.end()) {
+ LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << "
subscription - "
+ <<
subscriptionName_);
+ callback(ResultTopicNotFound);
+ return;
+ }
+
+ if (state_ == Closing || state_ == Closed) {
+ LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " <<
topic << " subscription - "
+ <<
subscriptionName_);
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ TopicNamePtr topicName;
+ if (!(topicName = TopicName::get(topic))) {
+ LOG_ERROR("TopicName invalid: " << topic);
+ callback(ResultUnknownError);
+ }
+ int numberPartitions = it->second;
+ boost::shared_ptr<std::atomic<int>> consumerUnsubed =
boost::make_shared<std::atomic<int>>(0);
+
+ for (int i = 0; i < numberPartitions; i++) {
+ std::string topicPartitionName = topicName->getTopicPartitionName(i);
+ std::map<std::string, ConsumerImplPtr>::iterator iterator =
consumers_.find(topicPartitionName);
+
+ if (consumers_.end() == iterator) {
+ LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: "
<< topicPartitionName);
+ callback(ResultUnknownError);
+ }
+
+ (iterator->second)
+
->unsubscribeAsync(boost::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync,
+ shared_from_this(), _1,
consumerUnsubed, numberPartitions,
+ topicName, topicPartitionName,
callback));
+ }
+}
+
+void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
+ Result result, boost::shared_ptr<std::atomic<int>> consumerUnsubed, int
numberPartitions,
+ TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback
callback) {
+ int previous = consumerUnsubed->fetch_add(1);
+ assert(previous < numberPartitions);
+
+ if (result != ResultOk) {
+ setState(Failed);
+ LOG_ERROR("Error Closing one of the consumers in TopicsConsumer,
result: "
+ << result << " topicPartitionName - " << topicPartitionName);
+ }
+
+ LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - "
<< topicPartitionName);
+
+ std::map<std::string, ConsumerImplPtr>::iterator iterator =
consumers_.find(topicPartitionName);
+ if (consumers_.end() != iterator) {
+ iterator->second->pauseMessageListener();
+ consumers_.erase(iterator);
+ }
+
+ if (consumerUnsubed->load() == numberPartitions) {
+ LOG_DEBUG("Unsubscribed all of the partition consumer for
TopicsConsumer. - " << consumerStr_);
+ std::map<std::string, int>::iterator it =
topicsPartitions_.find(topicNamePtr->toString());
+ if (it != topicsPartitions_.end()) {
+ numberTopicPartitions_->fetch_sub(numberPartitions);
+ Lock lock(mutex_);
+ topicsPartitions_.erase(it);
+ lock.unlock();
+ }
+ if (state_ != Failed) {
+ callback(ResultOk);
+ } else {
+ callback(ResultUnknownError);
+ }
+
unAckedMessageTrackerPtr_->removeTopicMessage(topicNamePtr->toString());
+ return;
+ }
+}
+
+void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+ if (state_ == Closing || state_ == Closed) {
+ LOG_ERROR("TopicsConsumer already closed "
+ << " topic" << topic_ << " consumer - " << consumerStr_);
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ setState(Closing);
+
+ if (consumers_.empty()) {
+ LOG_ERROR("TopicsConsumer have no consumers to close "
+ << " topic" << topic_ << " subscription - " <<
subscriptionName_);
+ setState(Closed);
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ // close successfully subscribed consumers
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer
!= consumers_.end();
+ consumer++) {
+ std::string topicPartitionName = consumer->first;
+ ConsumerImplPtr consumerPtr = consumer->second;
+
+
consumerPtr->closeAsync(boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose,
+ shared_from_this(), _1,
topicPartitionName, callback));
+ }
+}
+
+void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result,
std::string& topicPartitionName,
+ CloseCallback
callback) {
+ std::map<std::string, ConsumerImplPtr>::iterator iterator =
consumers_.find(topicPartitionName);
+ if (consumers_.end() != iterator) {
+ consumers_.erase(iterator);
+ }
+
+ LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName <<
" numberTopicPartitions_ - "
+ <<
numberTopicPartitions_->load());
+
+ assert(numberTopicPartitions_->load() > 0);
+ numberTopicPartitions_->fetch_sub(1);
+
+ if (result != ResultOk) {
+ setState(Failed);
+ LOG_ERROR("Closing the consumer failed for partition - " <<
topicPartitionName << " with error - "
+ << result);
+ }
+
+ // closed all consumers
+ if (numberTopicPartitions_->load() == 0) {
+ consumers_.clear();
+ topicsPartitions_.clear();
+ unAckedMessageTrackerPtr_->clear();
+
+ if (state_ != Failed) {
+ state_ = Closed;
+ }
+
+ multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+ if (!callback.empty()) {
+ callback(result);
+ }
+ return;
+ }
+}
+
+void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const
Message& msg) {
+ LOG_DEBUG("Received Message from one of the topic - " <<
consumer.getTopic()
+ << " message:" <<
msg.getDataAsString());
+ const std::string& topicPartitionName = consumer.getTopic();
+ msg.impl_->setTopicName(topicPartitionName);
+ messages_.push(msg);
+
+ if (messageListener_) {
+ listenerExecutor_->postWork(
+ boost::bind(&MultiTopicsConsumerImpl::internalListener,
shared_from_this(), consumer));
+ }
+}
+
+void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
+ Message m;
+ messages_.pop(m);
+
+ try {
+ messageListener_(Consumer(shared_from_this()), m);
+ } catch (const std::exception& e) {
+ LOG_ERROR("Exception thrown from listener of Partitioned Consumer" <<
e.what());
+ }
+}
+
+Result MultiTopicsConsumerImpl::receive(Message& msg) {
+ Lock lock(mutex_);
+ if (state_ != Ready) {
+ lock.unlock();
+ return ResultAlreadyClosed;
+ }
+
+ if (messageListener_) {
+ lock.unlock();
+ LOG_ERROR("Can not receive when a listener has been set");
+ return ResultInvalidConfiguration;
+ }
+ messages_.pop(msg);
+ lock.unlock();
+
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ return ResultOk;
+}
+
+Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
+ Lock lock(mutex_);
+ if (state_ != Ready) {
+ lock.unlock();
+ return ResultAlreadyClosed;
+ }
+
+ if (messageListener_) {
+ lock.unlock();
+ LOG_ERROR("Can not receive when a listener has been set");
+ return ResultInvalidConfiguration;
+ }
+
+ if (messages_.pop(msg, milliseconds(timeout))) {
+ lock.unlock();
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ return ResultOk;
+ } else {
+ return ResultTimeout;
+ }
+}
+
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId,
ResultCallback callback) {
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ const std::string& topicPartitionName = msgId.getTopicName();
+ std::map<std::string, ConsumerImplPtr>::iterator iterator =
consumers_.find(topicPartitionName);
+
+ if (consumers_.end() != iterator) {
+ unAckedMessageTrackerPtr_->remove(msgId);
+ iterator->second->acknowledgeAsync(msgId, callback);
+ } else {
+ LOG_ERROR("Message of topic: " << topicPartitionName << " not in
unAckedMessageTracker");
+ callback(ResultUnknownError);
+ return;
+ }
+}
+
+void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId&
msgId, ResultCallback callback) {
+ callback(ResultOperationNotSupported);
+}
+
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
+
+Future<Result, ConsumerImplBaseWeakPtr>
MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
+ return multiTopicsConsumerCreatedPromise_.getFuture();
+}
+const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const {
return subscriptionName_; }
+
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
+
+const std::string& MultiTopicsConsumerImpl::getName() const { return
consumerStr_; }
+
+void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
+ Lock lock(mutex_);
+ state_ = state;
+}
+
+bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState
expect,
+ MultiTopicsConsumerState
update) {
+ Lock lock(mutex_);
+ if (state_ == expect) {
+ state_ = update;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void MultiTopicsConsumerImpl::shutdown() {}
+
+bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
+
+bool MultiTopicsConsumerImpl::isOpen() {
+ Lock lock(mutex_);
+ return state_ == Ready;
+}
+
+void MultiTopicsConsumerImpl::receiveMessages() {
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer
!= consumers_.end();
+ consumer++) {
+ ConsumerImplPtr consumerPtr = consumer->second;
+ consumerPtr->receiveMessages(consumerPtr->getCnx().lock(),
conf_.getReceiverQueueSize());
+ LOG_DEBUG("Sending FLOW command for consumer - " <<
consumerPtr->getConsumerId());
+ }
+}
+
+Result MultiTopicsConsumerImpl::pauseMessageListener() {
+ if (!messageListener_) {
+ return ResultInvalidConfiguration;
+ }
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer
!= consumers_.end();
+ consumer++) {
+ (consumer->second)->pauseMessageListener();
+ }
+ return ResultOk;
+}
+
+Result MultiTopicsConsumerImpl::resumeMessageListener() {
+ if (!messageListener_) {
+ return ResultInvalidConfiguration;
+ }
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer
!= consumers_.end();
+ consumer++) {
+ (consumer->second)->resumeMessageListener();
+ }
+ return ResultOk;
+}
+
+void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
+ LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned
consumer.");
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer
!= consumers_.end();
+ consumer++) {
+ (consumer->second)->redeliverUnacknowledgedMessages();
+ }
+}
+
+int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return
messages_.size(); }
+
+void
MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
callback) {
+ Lock lock(mutex_);
+ if (state_ != Ready) {
+ lock.unlock();
+ callback(ResultConsumerNotInitialized, BrokerConsumerStats());
+ return;
+ }
+ MultiTopicsBrokerConsumerStatsPtr statsPtr =
+
boost::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
+ LatchPtr latchPtr =
boost::make_shared<Latch>(numberTopicPartitions_->load());
+ int size = consumers_.size();
+ lock.unlock();
+
+ ConsumerMap::const_iterator consumer = consumers_.begin();
+ for (int i = 0; i < size; i++, consumer++) {
+ consumer->second->getBrokerConsumerStatsAsync(
+ boost::bind(&MultiTopicsConsumerImpl::handleGetConsumerStats,
shared_from_this(), _1, _2,
+ latchPtr, statsPtr, i, callback));
+ }
+}
+
+void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res,
BrokerConsumerStats brokerConsumerStats,
+ LatchPtr latchPtr,
+
MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
+
BrokerConsumerStatsCallback callback) {
+ Lock lock(mutex_);
+ if (res == ResultOk) {
+ latchPtr->countdown();
+ statsPtr->add(brokerConsumerStats, index);
+ } else {
+ lock.unlock();
+ callback(res, BrokerConsumerStats());
+ return;
+ }
+ if (latchPtr->getCount() == 0) {
+ lock.unlock();
+ callback(ResultOk, BrokerConsumerStats(statsPtr));
+ }
+}
+
+boost::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(
+ const std::vector<std::string>& topics) {
+ TopicNamePtr topicNamePtr = boost::shared_ptr<TopicName>();
+ NamespaceNamePtr namespaceNamePtr = boost::shared_ptr<NamespaceName>();
+
+ // all topics name valid, and all topics have same namespace
+ for (std::vector<std::string>::const_iterator itr = topics.begin(); itr !=
topics.end(); itr++) {
+ // topic name valid
+ if (!(topicNamePtr = TopicName::get(*itr))) {
+ LOG_ERROR("Topic name invalid when init " << *itr);
+ return boost::shared_ptr<TopicName>();
+ }
+
+ // all contains same namespace part
+ if (!namespaceNamePtr) {
+ namespaceNamePtr = topicNamePtr->getNamespaceName();
+ } else if (!(*namespaceNamePtr ==
*(topicNamePtr->getNamespaceName()))) {
+ LOG_ERROR("Different namespace name. expected: " <<
namespaceNamePtr->toString() << " now:"
+ <<
topicNamePtr->getNamespaceName()->toString());
+ return boost::shared_ptr<TopicName>();
+ }
+ }
+
+ return topicNamePtr;
+}
+
+void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback
callback) {
+ callback(ResultOperationNotSupported);
+}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
new file mode 100644
index 0000000..6425687
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#include "ConsumerImpl.h"
+#include "ClientImpl.h"
+#include "BlockingQueue.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+#include "boost/enable_shared_from_this.hpp"
+#include "ConsumerImplBase.h"
+#include "lib/UnAckedMessageTrackerDisabled.h"
+#include <lib/Latch.h>
+#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
+#include <lib/TopicName.h>
+#include <lib/NamespaceName.h>
+
+namespace pulsar {
+typedef boost::shared_ptr<Promise<Result, Consumer>>
ConsumerSubResultPromisePtr;
+typedef boost::function<void(Result result)> ResultCallback;
+
+class MultiTopicsConsumerImpl;
+class MultiTopicsConsumerImpl : public ConsumerImplBase,
+ public
boost::enable_shared_from_this<MultiTopicsConsumerImpl> {
+ public:
+ enum MultiTopicsConsumerState
+ {
+ Pending,
+ Ready,
+ Closing,
+ Closed,
+ Failed
+ };
+ MultiTopicsConsumerImpl(ClientImplPtr client, const
std::vector<std::string>& topics,
+ const std::string& subscriptionName, TopicNamePtr
topicName,
+ const ConsumerConfiguration& conf, const
LookupServicePtr lookupServicePtr_);
+ virtual ~MultiTopicsConsumerImpl();
+ virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture();
+ virtual const std::string& getSubscriptionName() const;
+ virtual const std::string& getTopic() const;
+ virtual const std::string& getName() const;
+ virtual Result receive(Message& msg);
+ virtual Result receive(Message& msg, int timeout);
+ virtual void unsubscribeAsync(ResultCallback callback);
+ virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback
callback);
+ virtual void acknowledgeCumulativeAsync(const MessageId& msgId,
ResultCallback callback);
+ virtual void closeAsync(ResultCallback callback);
+ virtual void start();
+ virtual void shutdown();
+ virtual bool isClosed();
+ virtual bool isOpen();
+ virtual Result pauseMessageListener();
+ virtual Result resumeMessageListener();
+ virtual void redeliverUnacknowledgedMessages();
+ virtual int getNumOfPrefetchedMessages() const;
+ virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
callback);
+ void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr,
MultiTopicsBrokerConsumerStatsPtr,
+ size_t, BrokerConsumerStatsCallback);
+ // return first topic name when all topics name valid, or return null
pointer
+ static boost::shared_ptr<TopicName> topicNamesValid(const
std::vector<std::string>& topics);
+ void unsubscribeOneTopicAsync(const std::string& topic, ResultCallback
callback);
+ Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
+ // not supported
+ virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
+
+ private:
+ const ClientImplPtr client_;
+ const std::string subscriptionName_;
+ std::string consumerStr_;
+ std::string topic_;
+ NamespaceNamePtr namespaceName_;
+ const ConsumerConfiguration conf_;
+ typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
+ ConsumerMap consumers_;
+ std::map<std::string, int> topicsPartitions_;
+ boost::mutex mutex_;
+ MultiTopicsConsumerState state_;
+ boost::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+ LookupServicePtr lookupServicePtr_;
+ BlockingQueue<Message> messages_;
+ ExecutorServicePtr listenerExecutor_;
+ MessageListener messageListener_;
+ Promise<Result, ConsumerImplBaseWeakPtr>
multiTopicsConsumerCreatedPromise_;
+ UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+ const std::vector<std::string>& topics_;
+
+ /* methods */
+ void setState(MultiTopicsConsumerState state);
+ bool compareAndSetState(MultiTopicsConsumerState expect,
MultiTopicsConsumerState update);
+
+ void handleSinglePartitionConsumerCreated(Result result,
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+ unsigned int partitionIndex);
+ void handleSingleConsumerClose(Result result, std::string&
topicPartitionName, CloseCallback callback);
+ void notifyResult(CloseCallback closeCallback);
+ void messageReceived(Consumer consumer, const Message& msg);
+ void internalListener(Consumer consumer);
+ void receiveMessages();
+
+ void handleOneTopicSubscribed(Result result, Consumer consumer, const
std::string& topic,
+ boost::shared_ptr<std::atomic<int>>
topicsNeedCreate);
+ void subscribeTopicPartitions(const Result result, const
LookupDataResultPtr partitionMetadata,
+ TopicNamePtr topicName, const std::string&
consumerName,
+ ConsumerConfiguration conf,
+ ConsumerSubResultPromisePtr
topicSubResultPromise);
+ void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr
consumerImplBaseWeakPtr,
+ boost::shared_ptr<std::atomic<int>>
partitionsNeedCreate,
+ ConsumerSubResultPromisePtr
topicSubResultPromise);
+ void handleUnsubscribedAsync(Result result,
boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+ ResultCallback callback);
+ void handleOneTopicUnsubscribedAsync(Result result,
boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+ int numberPartitions, TopicNamePtr
topicNamePtr,
+ std::string& topicPartitionName,
ResultCallback callback);
+};
+
+} // namespace pulsar
+#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/lib/NamespaceName.cc
b/pulsar-client-cpp/lib/NamespaceName.cc
index caa5b79..273fc22 100644
--- a/pulsar-client-cpp/lib/NamespaceName.cc
+++ b/pulsar-client-cpp/lib/NamespaceName.cc
@@ -27,6 +27,7 @@
#include <sstream>
DECLARE_LOG_OBJECT()
+namespace pulsar {
boost::shared_ptr<NamespaceName> NamespaceName::get(const std::string&
property, const std::string& cluster,
const std::string&
namespaceName) {
@@ -103,3 +104,7 @@ std::string NamespaceName::getCluster() { return
this->cluster_; }
std::string NamespaceName::getLocalName() { return this->localName_; }
bool NamespaceName::isV2() { return this->cluster_.empty(); }
+
+std::string NamespaceName::toString() { return this->namespace_; }
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NamespaceName.h
b/pulsar-client-cpp/lib/NamespaceName.h
index 1f121bb..d5bdf99 100644
--- a/pulsar-client-cpp/lib/NamespaceName.h
+++ b/pulsar-client-cpp/lib/NamespaceName.h
@@ -25,6 +25,7 @@
#include <boost/shared_ptr.hpp>
#pragma GCC visibility push(default)
+namespace pulsar {
class NamespaceName : public ServiceUnitId {
public:
@@ -38,6 +39,7 @@ class NamespaceName : public ServiceUnitId {
const std::string&
namespaceName);
bool operator==(const NamespaceName& namespaceName);
bool isV2();
+ std::string toString();
private:
std::string namespace_;
@@ -51,6 +53,9 @@ class NamespaceName : public ServiceUnitId {
NamespaceName(const std::string& property, const std::string& namespace_);
};
+typedef boost::shared_ptr<NamespaceName> NamespaceNamePtr;
+
+} // namespace pulsar
#pragma GCC visibility pop
#endif
diff --git a/pulsar-client-cpp/lib/TopicName.cc
b/pulsar-client-cpp/lib/TopicName.cc
index 533f91e..9186752 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -228,4 +228,7 @@ const std::string TopicName::getTopicPartitionName(unsigned
int partition) {
topicPartitionName << toString() <<
PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
return topicPartitionName.str();
}
+
+NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/TopicName.h
b/pulsar-client-cpp/lib/TopicName.h
index 91b7994..1949d94 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -51,6 +51,7 @@ class TopicName : public ServiceUnitId {
std::string getLocalName();
std::string getEncodedLocalName();
std::string toString();
+ NamespaceNamePtr getNamespaceName();
static boost::shared_ptr<TopicName> get(const std::string& topicName);
bool operator==(const TopicName& other);
static std::string getEncodedName(const std::string& nameBeforeEncoding);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
b/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
index 62cf86e..c25c1a5 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
@@ -26,6 +26,7 @@ class UnAckedMessageTrackerDisabled : public
UnAckedMessageTrackerInterface {
bool add(const MessageId& m) { return false; }
bool remove(const MessageId& m) { return false; }
void removeMessagesTill(const MessageId& msgId) {}
+ void removeTopicMessage(const std::string& topic) {}
void clear() {}
};
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 90006b6..ba9fc97 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -100,6 +100,26 @@ void
UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
}
}
+// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic,
should remove all it's message.
+void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string&
topic) {
+ for (std::set<MessageId>::iterator it = oldSet_.begin(); it !=
oldSet_.end();) {
+ const std::string& topicPartitionName = it->getTopicName();
+ if (topicPartitionName.find(topic) != std::string::npos) {
+ oldSet_.erase(it++);
+ } else {
+ it++;
+ }
+ }
+ for (std::set<MessageId>::iterator it = currentSet_.begin(); it !=
currentSet_.end();) {
+ const std::string& topicPartitionName = it->getTopicName();
+ if (topicPartitionName.find(topic) != std::string::npos) {
+ currentSet_.erase(it++);
+ } else {
+ it++;
+ }
+ }
+}
+
void UnAckedMessageTrackerEnabled::clear() {
currentSet_.clear();
oldSet_.clear();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 217ee0b..7bea00d 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public
UnAckedMessageTrackerInterface {
bool add(const MessageId& m);
bool remove(const MessageId& m);
void removeMessagesTill(const MessageId& msgId);
+ void removeTopicMessage(const std::string& topic);
void timeoutHandler();
void clear();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index d010dd0..798ccb4 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -44,6 +44,9 @@ class UnAckedMessageTrackerInterface {
virtual bool remove(const MessageId& m) = 0;
virtual void removeMessagesTill(const MessageId& msgId) = 0;
virtual void clear() = 0;
+ // this is only for MultiTopicsConsumerImpl, when un-subscribe a single
topic, should remove all it's
+ // message.
+ virtual void removeTopicMessage(const std::string& topic) = 0;
};
typedef boost::scoped_ptr<UnAckedMessageTrackerInterface>
UnAckedMessageTrackerScopedPtr;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 023a587..cf28b34 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -33,6 +33,7 @@
#include "HttpHelper.h"
#include <set>
#include <vector>
+#include <lib/MultiTopicsConsumerImpl.h>
#include "lib/Future.h"
#include "lib/Utils.h"
DECLARE_LOG_OBJECT()
@@ -45,7 +46,6 @@ static int globalCount = 0;
static long globalResendMessageCount = 0;
static std::string lookupUrl = "pulsar://localhost:8885";
static std::string adminUrl = "http://localhost:8765/";
-
static void messageListenerFunction(Consumer consumer, const Message& msg) {
globalCount++;
consumer.acknowledge(msg);
@@ -1508,3 +1508,174 @@ TEST(BasicEndToEndTest,
testUnAckedMessageTimeoutListener) {
producer.close();
client.close();
}
+
+TEST(BasicEndToEndTest, testMultiTopicsConsumerTopicNameInvalid) {
+ Client client(lookupUrl);
+ std::vector<std::string> topicNames;
+ topicNames.reserve(3);
+ std::string subName = "testMultiTopicsTopicNameInvalid";
+ // cluster empty
+ std::string topicName1 =
"persistent://prop/testMultiTopicsTopicNameInvalid";
+
+ // empty topics
+ ASSERT_EQ(0, topicNames.size());
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicNames, subName, consConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ Result result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ LOG_INFO("subscribe on empty topics");
+ consumer.close();
+
+ // Invalid topic names
+ Consumer consumer1;
+ std::string subName1 = "testMultiTopicsTopicNameInvalid";
+ topicNames.push_back(topicName1);
+ Promise<Result, Consumer> consumerPromise1;
+ client.subscribeAsync(topicNames, subName1, consConfig,
WaitForCallbackValue<Consumer>(consumerPromise1));
+ Future<Result, Consumer> consumerFuture1 = consumerPromise1.getFuture();
+ result = consumerFuture1.get(consumer1);
+ ASSERT_EQ(ResultInvalidTopicName, result);
+ LOG_INFO("subscribe on TopicName1 failed");
+ consumer1.close();
+
+ client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) {
+ Client client(lookupUrl);
+ std::vector<std::string> topicNames;
+ topicNames.reserve(3);
+ std::string subName = "testMultiTopicsDifferentNamespace";
+ std::string topicName1 =
"persistent://prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1";
+ std::string topicName2 =
"persistent://prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2";
+ std::string topicName3 =
"persistent://prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3";
+
+ topicNames.push_back(topicName1);
+ topicNames.push_back(topicName2);
+ topicNames.push_back(topicName3);
+
+ // call admin api to make topics partitioned
+ std::string url1 =
+ adminUrl +
"admin/persistent/prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1/partitions";
+ std::string url2 =
+ adminUrl +
"admin/persistent/prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2/partitions";
+ std::string url3 =
+ adminUrl +
"admin/persistent/prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3/partitions";
+
+ int res = makePutRequest(url1, "2");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url2, "3");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url3, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ // empty topics
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicNames, subName, consConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ Result result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultInvalidTopicName, result);
+ LOG_INFO("subscribe on topics with different names should fail");
+ consumer.close();
+
+ client.shutdown();
+}
+
+// Test subscribe 3 topics using MultiTopicsConsumer
+TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
+ Client client(lookupUrl);
+ std::vector<std::string> topicNames;
+ topicNames.reserve(3);
+ std::string subName = "testMultiTopicsConsumer";
+ std::string topicName1 =
"persistent://prop/unit/ns/testMultiTopicsConsumer1";
+ std::string topicName2 =
"persistent://prop/unit/ns/testMultiTopicsConsumer2";
+ std::string topicName3 =
"persistent://prop/unit/ns/testMultiTopicsConsumer3";
+
+ topicNames.push_back(topicName1);
+ topicNames.push_back(topicName2);
+ topicNames.push_back(topicName3);
+
+ // call admin api to make topics partitioned
+ std::string url1 = adminUrl +
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions";
+ std::string url2 = adminUrl +
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer2/partitions";
+ std::string url3 = adminUrl +
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer3/partitions";
+
+ int res = makePutRequest(url1, "2");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url2, "3");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url3, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer1;
+ Result result = client.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+
+ LOG_INFO("created 3 producers");
+
+ int messageNumber = 100;
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ consConfig.setReceiverQueueSize(10); // size for each sub-consumer
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicNames, subName, consConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+ LOG_INFO("created topics consumer on 3 topics");
+
+ std::string msgContent = "msg-content";
+ LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer1.send(msg));
+ }
+
+ msgContent = "msg-content2";
+ LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer2.send(msg));
+ }
+
+ msgContent = "msg-content3";
+ LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer3.send(msg));
+ }
+
+ LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+ for (int i = 0; i < 3 * messageNumber; i++) {
+ Message m;
+ ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+ }
+
+ LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+ client.shutdown();
+}
diff --git a/pulsar-client-cpp/tests/NamespaceNameTest.cc
b/pulsar-client-cpp/tests/NamespaceNameTest.cc
index 84ccb13..5c214e2 100644
--- a/pulsar-client-cpp/tests/NamespaceNameTest.cc
+++ b/pulsar-client-cpp/tests/NamespaceNameTest.cc
@@ -19,6 +19,7 @@
#include <NamespaceName.h>
#include <gtest/gtest.h>
+using namespace pulsar;
TEST(NamespaceNameTest, testNamespaceName) {
boost::shared_ptr<NamespaceName> nn1 = NamespaceName::get("property",
"cluster", "namespace");