This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 93e192f Cpp client: add PatternMultiTopicsConsumerImpl to support
regex subscribe (#2219)
93e192f is described below
commit 93e192f7b8ce6273c2d906edf0a19726fe7d455a
Author: Jia Zhai <[email protected]>
AuthorDate: Sat Aug 4 14:56:56 2018 +0800
Cpp client: add PatternMultiTopicsConsumerImpl to support regex subscribe
(#2219)
In PR #1279 and #1298 we added regex based subscription. This is a catch up
work to add `PatternMultiTopicsConsumerImpl` in cpp client.
---
pulsar-client-cpp/include/pulsar/Client.h | 16 ++
.../include/pulsar/ConsumerConfiguration.h | 10 +
pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 42 ++++
pulsar-client-cpp/lib/BinaryProtoLookupService.h | 9 +
pulsar-client-cpp/lib/Client.cc | 24 ++
pulsar-client-cpp/lib/ClientConnection.cc | 86 ++++++-
pulsar-client-cpp/lib/ClientConnection.h | 13 +-
pulsar-client-cpp/lib/ClientImpl.cc | 58 +++++
pulsar-client-cpp/lib/ClientImpl.h | 8 +
pulsar-client-cpp/lib/Commands.cc | 18 ++
pulsar-client-cpp/lib/Commands.h | 1 +
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 6 +
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 +-
pulsar-client-cpp/lib/HTTPLookupService.cc | 112 +++++++--
pulsar-client-cpp/lib/HTTPLookupService.h | 9 +-
pulsar-client-cpp/lib/LookupService.h | 11 +
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 2 +-
.../lib/PatternMultiTopicsConsumerImpl.cc | 235 +++++++++++++++++++
.../lib/PatternMultiTopicsConsumerImpl.h | 76 ++++++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 257 +++++++++++++++++++++
pulsar-client-cpp/tests/BinaryLookupServiceTest.cc | 67 ++++++
21 files changed, 1032 insertions(+), 32 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Client.h
b/pulsar-client-cpp/include/pulsar/Client.h
index 6a9e487..1913851 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,9 @@ class Client {
void subscribeAsync(const std::string& topic, const std::string&
consumerName,
const ConsumerConfiguration& conf, SubscribeCallback
callback);
+ /**
+ * subscribe for multiple topics under the same namespace.
+ */
Result subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
Consumer& consumer);
Result subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
@@ -109,6 +112,19 @@ class Client {
const ConsumerConfiguration& conf, SubscribeCallback
callback);
/**
+ * subscribe for multiple topics, which match given regexPattern, under
the same namespace.
+ */
+ Result subscribeWithRegex(const std::string& regexPattern, const
std::string& consumerName,
+ Consumer& consumer);
+ Result subscribeWithRegex(const std::string& regexPattern, const
std::string& consumerName,
+ const ConsumerConfiguration& conf, Consumer&
consumer);
+
+ void subscribeWithRegexAsync(const std::string& regexPattern, const
std::string& consumerName,
+ SubscribeCallback callback);
+ void subscribeWithRegexAsync(const std::string& regexPattern, const
std::string& consumerName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback);
+
+ /**
* Create a topic reader with given {@code ReaderConfiguration} for
reading messages from the specified
* topic.
* <p>
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index c9584e3..36e5808 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -152,6 +152,16 @@ class ConsumerConfiguration {
bool isReadCompacted() const;
void setReadCompacted(bool compacted);
+ /**
+ * Set the time duration in minutes, for which the
PatternMultiTopicsConsumer will do a pattern auto
+ * discovery.
+ * The default value is 60 seconds. less than 0 will disable auto
discovery.
+ *
+ * @param periodInSeconds period in seconds to do an auto discovery
+ */
+ void setPatternAutoDiscoveryPeriod(int periodInSeconds);
+ int getPatternAutoDiscoveryPeriod() const;
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index c4bef30..296faee 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -152,4 +152,46 @@ uint64_t BinaryProtoLookupService::newRequestId() {
Lock lock(mutex_);
return ++requestIdGenerator_;
}
+
+Future<Result, NamespaceTopicsPtr>
BinaryProtoLookupService::getTopicsOfNamespaceAsync(
+ const NamespaceNamePtr& nsName) {
+ NamespaceTopicsPromisePtr promise = boost::make_shared<Promise<Result,
NamespaceTopicsPtr>>();
+ if (!nsName) {
+ promise->setFailed(ResultInvalidTopicName);
+ return promise->getFuture();
+ }
+ std::string namespaceName = nsName->toString();
+ Future<Result, ClientConnectionWeakPtr> future =
cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
+
future.addListener(boost::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest,
this,
+ namespaceName, _1, _2, promise));
+ return promise->getFuture();
+}
+
+void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const
std::string& nsName, Result result,
+ const
ClientConnectionWeakPtr& clientCnx,
+
NamespaceTopicsPromisePtr promise) {
+ if (result != ResultOk) {
+ promise->setFailed(ResultConnectError);
+ return;
+ }
+
+ ClientConnectionPtr conn = clientCnx.lock();
+ uint64_t requestId = newRequestId();
+ LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << "
nsName: " << nsName);
+
+ conn->newGetTopicsOfNamespace(nsName, requestId)
+ .addListener(
+
boost::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, _1,
_2, promise));
+}
+
+void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result,
NamespaceTopicsPtr topicsPtr,
+
NamespaceTopicsPromisePtr promise) {
+ if (result != ResultOk) {
+ promise->setFailed(ResultLookupError);
+ return;
+ }
+
+ promise->setValue(topicsPtr);
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index f647c62..36fb5e8 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -40,6 +40,8 @@ class BinaryProtoLookupService : public LookupService {
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName);
+ Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName);
+
private:
boost::mutex mutex_;
uint64_t requestIdGenerator_;
@@ -61,6 +63,13 @@ class BinaryProtoLookupService : public LookupService {
const ClientConnectionWeakPtr&
clientCnx,
LookupDataResultPromisePtr promise);
+ void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result
result,
+ const ClientConnectionWeakPtr&
clientCnx,
+ NamespaceTopicsPromisePtr promise);
+
+ void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr
topicsPtr,
+ NamespaceTopicsPromisePtr promise);
+
uint64_t newRequestId();
};
typedef boost::shared_ptr<BinaryProtoLookupService>
BinaryProtoLookupServicePtr;
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index bba3520..5cfe01f 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -114,6 +114,30 @@ void Client::subscribeAsync(const
std::vector<std::string>& topics, const std::s
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
}
+Result Client::subscribeWithRegex(const std::string& regexPattern, const
std::string& subscriptionName,
+ Consumer& consumer) {
+ return subscribeWithRegex(regexPattern, subscriptionName,
ConsumerConfiguration(), consumer);
+}
+
+Result Client::subscribeWithRegex(const std::string& regexPattern, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf, Consumer&
consumer) {
+ Promise<Result, Consumer> promise;
+ subscribeWithRegexAsync(regexPattern, subscriptionName, conf,
WaitForCallbackValue<Consumer>(promise));
+ Future<Result, Consumer> future = promise.getFuture();
+
+ return future.get(consumer);
+}
+
+void Client::subscribeWithRegexAsync(const std::string& regexPattern, const
std::string& subscriptionName,
+ SubscribeCallback callback) {
+ subscribeWithRegexAsync(regexPattern, subscriptionName,
ConsumerConfiguration(), callback);
+}
+
+void Client::subscribeWithRegexAsync(const std::string& regexPattern, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback) {
+ impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf,
callback);
+}
+
Result Client::createReader(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, Reader& reader) {
Promise<Result, Reader> promise;
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 4e6d0f2..da30707 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -219,7 +219,7 @@ void ClientConnection::handlePulsarConnected(const
CommandConnected& cmdConnecte
}
void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t>
consumerStatsRequests) {
- std::vector<Promise<Result, BrokerConsumerStatsImpl> >
consumerStatsPromises;
+ std::vector<Promise<Result, BrokerConsumerStatsImpl>>
consumerStatsPromises;
Lock lock(mutex_);
for (int i = 0; i < consumerStatsRequests.size(); i++) {
@@ -856,6 +856,7 @@ void ClientConnection::handleIncomingCommand() {
<< " -- req_id: " <<
error.request_id());
Lock lock(mutex_);
+
PendingRequestsMap::iterator it =
pendingRequests_.find(error.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
@@ -865,19 +866,28 @@ void ClientConnection::handleIncomingCommand() {
requestData.promise.setFailed(getResult(error.error()));
requestData.timer->cancel();
} else {
- PendingGetLastMessageIdRequestsMap::iterator it2 =
+ PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
- if (it2 != pendingGetLastMessageIdRequests_.end()) {
- Promise<Result, MessageId> getLastMessageIdPromise
= it2->second;
- pendingGetLastMessageIdRequests_.erase(it2);
+ if (it != pendingGetLastMessageIdRequests_.end()) {
+ Promise<Result, MessageId> getLastMessageIdPromise
= it->second;
+ pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
getLastMessageIdPromise.setFailed(getResult(error.error()));
} else {
- lock.unlock();
+ PendingGetNamespaceTopicsMap::iterator it =
+
pendingGetNamespaceTopicsRequests_.find(error.request_id());
+ if (it !=
pendingGetNamespaceTopicsRequests_.end()) {
+ Promise<Result, NamespaceTopicsPtr>
getNamespaceTopicsPromise = it->second;
+ pendingGetNamespaceTopicsRequests_.erase(it);
+ lock.unlock();
+
+
getNamespaceTopicsPromise.setFailed(getResult(error.error()));
+ } else {
+ lock.unlock();
+ }
}
}
-
break;
}
@@ -978,6 +988,51 @@ void ClientConnection::handleIncomingCommand() {
break;
}
+ case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
+ const CommandGetTopicsOfNamespaceResponse& response =
+ incomingCmd_.gettopicsofnamespaceresponse();
+
+ LOG_DEBUG(cnxString_ << "Received
GetTopicsOfNamespaceResponse from server. req_id: "
+ << response.request_id() << "
topicsSize" << response.topics_size());
+
+ Lock lock(mutex_);
+ PendingGetNamespaceTopicsMap::iterator it =
+
pendingGetNamespaceTopicsRequests_.find(response.request_id());
+
+ if (it != pendingGetNamespaceTopicsRequests_.end()) {
+ Promise<Result, NamespaceTopicsPtr> getTopicsPromise =
it->second;
+ pendingGetNamespaceTopicsRequests_.erase(it);
+ lock.unlock();
+
+ int numTopics = response.topics_size();
+ std::set<std::string> topicSet;
+ // get all topics
+ for (int i = 0; i < numTopics; i++) {
+ // remove partition part
+ const std::string& topicName = response.topics(i);
+ int pos = topicName.find("-partition-");
+ std::string filteredName = topicName.substr(0,
pos);
+
+ // filter duped topic name
+ if (topicSet.find(filteredName) == topicSet.end())
{
+ topicSet.insert(filteredName);
+ }
+ }
+
+ NamespaceTopicsPtr topicsPtr =
+
boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
+
+ getTopicsPromise.setValue(topicsPtr);
+ } else {
+ lock.unlock();
+ LOG_WARN(
+ "GetTopicsOfNamespaceResponse command - Received
unknown request id from "
+ "server: "
+ << response.request_id());
+ }
+ break;
+ }
+
default: {
LOG_WARN(cnxString_ << "Received invalid message from
server");
close();
@@ -1281,4 +1336,21 @@ Future<Result, MessageId>
ClientConnection::newGetLastMessageId(uint64_t consume
return promise.getFuture();
}
+Future<Result, NamespaceTopicsPtr>
ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
+
uint64_t requestId) {
+ Lock lock(mutex_);
+ Promise<Result, NamespaceTopicsPtr> promise;
+ if (isClosed()) {
+ lock.unlock();
+ LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+ promise.setFailed(ResultNotConnected);
+ return promise.getFuture();
+ }
+
+ pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId,
promise));
+ lock.unlock();
+ sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
+ return promise.getFuture();
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConnection.h
b/pulsar-client-cpp/lib/ClientConnection.h
index 860ca6a..bdbc8e5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -70,6 +70,8 @@ struct OpSendMsg;
typedef std::pair<std::string, int64_t> ResponseData;
+typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+
class ClientConnection : public
boost::enable_shared_from_this<ClientConnection> {
enum State
{
@@ -81,7 +83,7 @@ class ClientConnection : public
boost::enable_shared_from_this<ClientConnection>
public:
typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
- typedef
boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&> >
TlsSocketPtr;
+ typedef
boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>>
TlsSocketPtr;
typedef boost::shared_ptr<ClientConnection> ConnectionPtr;
typedef boost::function<void(const boost::system::error_code&,
ConnectionPtr)> ConnectionListener;
typedef std::vector<ConnectionListener>::iterator ListenerIterator;
@@ -144,6 +146,8 @@ class ClientConnection : public
boost::enable_shared_from_this<ClientConnection>
Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId,
uint64_t requestId);
+ Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const
std::string& nsName, uint64_t requestId);
+
private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
@@ -264,12 +268,15 @@ class ClientConnection : public
boost::enable_shared_from_this<ClientConnection>
typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
ConsumersMap consumers_;
- typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> >
PendingConsumerStatsMap;
+ typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>>
PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;
- typedef std::map<long, Promise<Result, MessageId> >
PendingGetLastMessageIdRequestsMap;
+ typedef std::map<long, Promise<Result, MessageId>>
PendingGetLastMessageIdRequestsMap;
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
+ typedef std::map<long, Promise<Result, NamespaceTopicsPtr>>
PendingGetNamespaceTopicsMap;
+ PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
+
boost::mutex mutex_;
typedef boost::unique_lock<boost::mutex> Lock;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index 3768926..ec113fc 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -25,6 +25,7 @@
#include "PartitionedProducerImpl.h"
#include "PartitionedConsumerImpl.h"
#include "MultiTopicsConsumerImpl.h"
+#include "PatternMultiTopicsConsumerImpl.h"
#include "SimpleLoggerImpl.h"
#include "Log4CxxLogger.h"
#include <boost/bind.hpp>
@@ -35,6 +36,7 @@
#include <lib/HTTPLookupService.h>
#include <lib/TopicName.h>
#include <algorithm>
+#include <regex>
DECLARE_LOG_OBJECT()
@@ -119,6 +121,9 @@ ExecutorServiceProviderPtr
ClientImpl::getListenerExecutorProvider() { return li
ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
return partitionListenerExecutorProvider_;
}
+
+LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
+
void ClientImpl::createProducerAsync(const std::string& topic,
ProducerConfiguration conf,
CreateProducerCallback callback) {
TopicNamePtr topicName;
@@ -212,6 +217,59 @@ void ClientImpl::handleReaderMetadataLookup(const Result
result, const LookupDat
consumers_.push_back(reader->getConsumer());
}
+void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern,
const std::string& consumerName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback) {
+ TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
+
+ Lock lock(mutex_);
+ if (state_ != Open) {
+ lock.unlock();
+ callback(ResultAlreadyClosed, Consumer());
+ return;
+ } else {
+ lock.unlock();
+ if (!topicNamePtr) {
+ LOG_ERROR("Topic pattern not valid: " << regexPattern);
+ callback(ResultInvalidTopicName, Consumer());
+ return;
+ }
+ }
+
+ NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
+
+ lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
+ boost::bind(&ClientImpl::createPatternMultiTopicsConsumer,
shared_from_this(), _1, _2, regexPattern,
+ consumerName, conf, callback));
+}
+
+void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const
NamespaceTopicsPtr topics,
+ const std::string&
regexPattern,
+ const std::string&
consumerName,
+ const ConsumerConfiguration&
conf,
+ SubscribeCallback callback) {
+ if (result == ResultOk) {
+ ConsumerImplBasePtr consumer;
+
+ std::regex pattern(regexPattern);
+
+ NamespaceTopicsPtr matchTopics =
+ PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics,
pattern);
+
+ consumer = boost::make_shared<PatternMultiTopicsConsumerImpl>(
+ shared_from_this(), regexPattern, *matchTopics, consumerName,
conf, lookupServicePtr_);
+
+ consumer->getConsumerCreatedFuture().addListener(
+ boost::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), _1, _2, callback, consumer));
+ Lock lock(mutex_);
+ consumers_.push_back(consumer);
+ lock.unlock();
+ consumer->start();
+ } else {
+ LOG_ERROR("Error Getting topicsOfNameSpace while
createPatternMultiTopicsConsumer: " << result);
+ callback(result, Consumer());
+ }
+}
+
void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const
std::string& consumerName,
const ConsumerConfiguration& conf,
SubscribeCallback callback) {
TopicNamePtr topicNamePtr;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h
b/pulsar-client-cpp/lib/ClientImpl.h
index 550298b..54d459d 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -60,6 +60,9 @@ class ClientImpl : public
boost::enable_shared_from_this<ClientImpl> {
void subscribeAsync(const std::vector<std::string>& topics, const
std::string& consumerName,
const ConsumerConfiguration& conf, SubscribeCallback
callback);
+ void subscribeWithRegexAsync(const std::string& regexPattern, const
std::string& consumerName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback);
+
void createReaderAsync(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, ReaderCallback
callback);
@@ -82,6 +85,7 @@ class ClientImpl : public
boost::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr getIOExecutorProvider();
ExecutorServiceProviderPtr getListenerExecutorProvider();
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
+ LookupServicePtr getLookup();
friend class PulsarFriend;
private:
@@ -106,6 +110,10 @@ class ClientImpl : public
boost::enable_shared_from_this<ClientImpl> {
void handleClose(Result result, SharedInt remaining, ResultCallback
callback);
+ void createPatternMultiTopicsConsumer(const Result result, const
NamespaceTopicsPtr topics,
+ const std::string& regexPattern,
const std::string& consumerName,
+ const ConsumerConfiguration& conf,
SubscribeCallback callback);
+
enum State
{
Open,
diff --git a/pulsar-client-cpp/lib/Commands.cc
b/pulsar-client-cpp/lib/Commands.cc
index 13bf99a..8a1933b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -324,6 +324,18 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t
consumerId, uint64_t request
return buffer;
}
+SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
uint64_t requestId) {
+ BaseCommand cmd;
+ cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
+ CommandGetTopicsOfNamespace* getTopics =
cmd.mutable_gettopicsofnamespace();
+ getTopics->set_request_id(requestId);
+ getTopics->set_namespace_(nsName);
+
+ const SharedBuffer buffer = writeMessageWithSize(cmd);
+ cmd.clear_gettopicsofnamespace();
+ return buffer;
+}
+
std::string Commands::messageType(BaseCommand_Type type) {
switch (type) {
case BaseCommand::CONNECT:
@@ -416,6 +428,12 @@ std::string Commands::messageType(BaseCommand_Type type) {
case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
return "GET_LAST_MESSAGE_ID_RESPONSE";
break;
+ case BaseCommand::GET_TOPICS_OF_NAMESPACE:
+ return "GET_TOPICS_OF_NAMESPACE";
+ break;
+ case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
+ return "GET_TOPICS_OF_NAMESPACE_RESPONSE";
+ break;
};
}
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 53fb1bb..d9b8589 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -112,6 +112,7 @@ class Commands {
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const
MessageId& messageId);
static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t
requestId);
+ static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName,
uint64_t requestId);
private:
Commands();
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 0c145c1..058ca57 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -105,4 +105,10 @@ bool ConsumerConfiguration::isReadCompacted() const {
return impl_->readCompacte
void ConsumerConfiguration::setReadCompacted(bool compacted) {
impl_->readCompacted = compacted; }
+void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds)
{
+ impl_->patternAutoDiscoveryPeriod = periodInSeconds;
+}
+
+int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return
impl_->patternAutoDiscoveryPeriod; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index eb0c374..0cc0c72 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -35,6 +35,7 @@ struct ConsumerConfigurationImpl {
CryptoKeyReaderPtr cryptoKeyReader;
ConsumerCryptoFailureAction cryptoFailureAction;
bool readCompacted;
+ int patternAutoDiscoveryPeriod;
ConsumerConfigurationImpl()
: unAckedMessagesTimeoutMs(0),
consumerType(ConsumerExclusive),
@@ -45,7 +46,8 @@ struct ConsumerConfigurationImpl {
maxTotalReceiverQueueSizeAcrossPartitions(50000),
cryptoKeyReader(),
cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
- readCompacted(false) {}
+ readCompacted(false),
+ patternAutoDiscoveryPeriod(60) {}
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 36d11f5..fe27e8d 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -68,8 +68,9 @@ Future<Result, LookupDataResultPtr>
HTTPLookupService::lookupAsync(const std::st
<< topicName->getEncodedLocalName();
}
-
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
shared_from_this(),
- promise,
completeUrlStream.str(), Lookup));
+
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest,
+ shared_from_this(),
promise, completeUrlStream.str(),
+ Lookup));
return promise.getFuture();
}
@@ -89,8 +90,27 @@ Future<Result, LookupDataResultPtr>
HTTPLookupService::getPartitionMetadataAsync
<< '/' << PARTITION_METHOD_NAME;
}
-
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
shared_from_this(),
- promise,
completeUrlStream.str(), PartitionMetaData));
+
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest,
+ shared_from_this(),
promise, completeUrlStream.str(),
+ PartitionMetaData));
+ return promise.getFuture();
+}
+
+Future<Result, NamespaceTopicsPtr>
HTTPLookupService::getTopicsOfNamespaceAsync(
+ const NamespaceNamePtr &nsName) {
+ NamespaceTopicsPromise promise;
+ std::stringstream completeUrlStream;
+
+ if (nsName->isV2()) {
+ completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/'
<< nsName->toString() << '/'
+ << "topics";
+ } else {
+ completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/'
<< nsName->toString() << '/'
+ << "destinations";
+ }
+
+
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
+ shared_from_this(),
promise, completeUrlStream.str()));
return promise.getFuture();
}
@@ -99,19 +119,28 @@ static size_t curlWriteCallback(void *contents, size_t
size, size_t nmemb, void
return size * nmemb;
}
-void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const
std::string completeUrl,
- RequestType requestType) {
+void
HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise
promise,
+ const std::string
completeUrl) {
+ std::string responseData;
+ Result result = sendHTTPRequest(completeUrl, responseData);
+
+ if (result != ResultOk) {
+ promise.setFailed(result);
+ } else {
+ promise.setValue(parseNamespaceTopicsData(responseData));
+ }
+}
+
+Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl,
std::string &responseData) {
CURL *handle;
CURLcode res;
- std::string responseData;
std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_;
handle = curl_easy_init();
if (!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
- promise.setFailed(ResultLookupError);
// No curl_easy_cleanup required since handle not initialized
- return;
+ return ResultLookupError;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());
@@ -148,9 +177,8 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise
promise, const std::string
"All Authentication methods should have AuthenticationData and
return true on getAuthData for "
"url "
<< completeUrl);
- promise.setFailed(authResult);
curl_easy_cleanup(handle);
- return;
+ return authResult;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
@@ -158,7 +186,7 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise
promise, const std::string
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
- LOG_INFO("Curl Lookup Request sent for" << completeUrl);
+ LOG_INFO("Curl Lookup Request sent for " << completeUrl);
// Make get call to server
res = curl_easy_perform(handle);
@@ -166,16 +194,17 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise
promise, const std::string
// Free header list
curl_slist_free_all(list);
+ Result retResult = ResultOk;
+
switch (res) {
case CURLE_OK:
long response_code;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " code "
<< response_code);
if (response_code == 200) {
- promise.setValue((requestType == PartitionMetaData) ?
parsePartitionData(responseData)
- :
parseLookupData(responseData));
+ retResult = ResultOk;
} else {
- promise.setFailed(ResultLookupError);
+ retResult = ResultLookupError;
}
break;
case CURLE_COULDNT_CONNECT:
@@ -183,22 +212,23 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise
promise, const std::string
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error
Code " << res);
- promise.setFailed(ResultConnectError);
+ retResult = ResultConnectError;
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error
Code " << res);
- promise.setFailed(ResultReadError);
+ retResult = ResultReadError;
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error
Code " << res);
- promise.setFailed(ResultTimeout);
+ retResult = ResultTimeout;
break;
default:
LOG_ERROR("Response failed for url " << completeUrl << ". Error
Code " << res);
- promise.setFailed(ResultLookupError);
+ retResult = ResultLookupError;
break;
}
curl_easy_cleanup(handle);
+ return retResult;
}
LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string
&json) {
@@ -243,4 +273,48 @@ LookupDataResultPtr
HTTPLookupService::parseLookupData(const std::string &json)
LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
+
+NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const
std::string &json) {
+ Json::Value root;
+ Json::Reader reader;
+ if (!reader.parse(json, root, false)) {
+ LOG_ERROR("Failed to parse json of Topics of Namespace: " <<
reader.getFormatedErrorMessages()
+ << "\nInput
Json = " << json);
+ return NamespaceTopicsPtr();
+ }
+
+ Json::Value topicsArray = root["topics"];
+ std::set<std::string> topicSet;
+ // get all topics
+ for (int i = 0; i < topicsArray.size(); i++) {
+ // remove partition part
+ const std::string &topicName = topicsArray[i].asString();
+ int pos = topicName.find("-partition-");
+ std::string filteredName = topicName.substr(0, pos);
+
+ // filter duped topic name
+ if (topicSet.find(filteredName) == topicSet.end()) {
+ topicSet.insert(filteredName);
+ }
+ }
+
+ NamespaceTopicsPtr topicsResultPtr =
+ boost::make_shared<std::vector<std::string>>(topicSet.begin(),
topicSet.end());
+
+ return topicsResultPtr;
+}
+
+void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const
std::string completeUrl,
+ RequestType requestType) {
+ std::string responseData;
+ Result result = sendHTTPRequest(completeUrl, responseData);
+
+ if (result != ResultOk) {
+ promise.setFailed(result);
+ } else {
+ promise.setValue((requestType == PartitionMetaData) ?
parsePartitionData(responseData)
+ :
parseLookupData(responseData));
+ }
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h
b/pulsar-client-cpp/lib/HTTPLookupService.h
index d7fff33..66cd251 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -52,7 +52,12 @@ class HTTPLookupService : public LookupService, public
boost::enable_shared_from
static LookupDataResultPtr parsePartitionData(const std::string&);
static LookupDataResultPtr parseLookupData(const std::string&);
- void sendHTTPRequest(LookupPromise, const std::string, RequestType);
+ static NamespaceTopicsPtr parseNamespaceTopicsData(const std::string&);
+
+ void handleLookupHTTPRequest(LookupPromise, const std::string,
RequestType);
+ void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise,
const std::string completeUrl);
+
+ Result sendHTTPRequest(const std::string completeUrl, std::string&
responseData);
public:
HTTPLookupService(const std::string&, const ClientConfiguration&, const
AuthenticationPtr&);
@@ -60,6 +65,8 @@ class HTTPLookupService : public LookupService, public
boost::enable_shared_from
Future<Result, LookupDataResultPtr> lookupAsync(const std::string&);
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr&);
+
+ Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName);
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/LookupService.h
b/pulsar-client-cpp/lib/LookupService.h
index 48d5595..1221263 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -26,6 +26,10 @@
#include <lib/TopicName.h>
namespace pulsar {
+typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise;
+typedef boost::shared_ptr<Promise<Result, NamespaceTopicsPtr>>
NamespaceTopicsPromisePtr;
+
class LookupService {
public:
/*
@@ -42,6 +46,13 @@ class LookupService {
*/
virtual Future<Result, LookupDataResultPtr>
getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0;
+ /**
+ * @param namespace - namespace-name
+ *
+ * Returns all the topics name for a given namespace.
+ */
+ virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) = 0;
+
virtual ~LookupService() {}
};
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 6425687..3b1d985 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -80,7 +80,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
// not supported
virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
- private:
+ protected:
const ClientImplPtr client_;
const std::string subscriptionName_;
std::string consumerStr_;
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
new file mode 100644
index 0000000..95f8a36
--- /dev/null
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "PatternMultiTopicsConsumerImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr
client,
+ const
std::string pattern,
+ const
std::vector<std::string>& topics,
+ const
std::string& subscriptionName,
+ const
ConsumerConfiguration& conf,
+ const
LookupServicePtr lookupServicePtr_)
+ : MultiTopicsConsumerImpl(client, topics, subscriptionName,
TopicName::get(pattern), conf,
+ lookupServicePtr_),
+ patternString_(pattern),
+ pattern_(std::regex(pattern)),
+ autoDiscoveryTimer_(),
+ autoDiscoveryRunning_(false) {}
+
+const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return
pattern_; }
+
+void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
+ autoDiscoveryRunning_ = false;
+
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+ autoDiscoveryTimer_->async_wait(
+ boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask,
this, _1));
+}
+
+void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const
boost::system::error_code& err) {
+ if (err == boost::asio::error::operation_aborted) {
+ LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
+ return;
+ } else if (err) {
+ LOG_ERROR(getName() << "Timer error: " << err.message());
+ return;
+ }
+
+ if (state_ != Ready) {
+ LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: "
<< state_);
+ resetAutoDiscoveryTimer();
+ return;
+ }
+
+ if (autoDiscoveryRunning_) {
+ LOG_DEBUG("autoDiscoveryTimerTask still running, cancel this running.
");
+ return;
+ }
+
+ autoDiscoveryRunning_ = true;
+
+ // already get namespace from pattern.
+ assert(namespaceName_);
+
+ lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_)
+
.addListener(boost::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
this, _1, _2));
+}
+
+void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result
result,
+ const
NamespaceTopicsPtr topics) {
+ if (result != ResultOk) {
+ LOG_ERROR("Error in Getting topicsOfNameSpace. result: " << result);
+ resetAutoDiscoveryTimer();
+ return;
+ }
+
+ NamespaceTopicsPtr newTopics =
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern_);
+ // get old topics in consumer:
+ NamespaceTopicsPtr oldTopics =
boost::make_shared<std::vector<std::string>>();
+ for (std::map<std::string, int>::iterator it = topicsPartitions_.begin();
it != topicsPartitions_.end();
+ it++) {
+ oldTopics->push_back(it->first);
+ }
+ NamespaceTopicsPtr topicsAdded = topicsListsMinus(*newTopics, *oldTopics);
+ NamespaceTopicsPtr topicsRemoved = topicsListsMinus(*oldTopics,
*newTopics);
+
+ // callback method when removed topics all un-subscribed.
+ ResultCallback topicsRemovedCallback = [this](Result result) {
+ if (result != ResultOk) {
+ LOG_ERROR("Failed to unsubscribe topics: " << result);
+ }
+ resetAutoDiscoveryTimer();
+ };
+
+ // callback method when added topics all subscribed.
+ ResultCallback topicsAddedCallback = [this, topicsRemoved,
topicsRemovedCallback](Result result) {
+ if (result == ResultOk) {
+ // call to unsubscribe all removed topics.
+ onTopicsRemoved(topicsRemoved, topicsRemovedCallback);
+ } else {
+ resetAutoDiscoveryTimer();
+ }
+ };
+
+ // call to subscribe new added topics, then in its callback do unsubscribe
+ onTopicsAdded(topicsAdded, topicsAddedCallback);
+}
+
+void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr
addedTopics, ResultCallback callback) {
+ // start call subscribeOneTopicAsync for each single topic
+
+ if (addedTopics->empty()) {
+ LOG_DEBUG("no topics need subscribe");
+ callback(ResultOk);
+ return;
+ }
+ int topicsNumber = addedTopics->size();
+
+ boost::shared_ptr<std::atomic<int>> topicsNeedCreate =
boost::make_shared<std::atomic<int>>(topicsNumber);
+ // subscribe for each passed in topic
+ for (std::vector<std::string>::const_iterator itr = addedTopics->begin();
itr != addedTopics->end();
+ itr++) {
+ MultiTopicsConsumerImpl::subscribeOneTopicAsync(*itr).addListener(
+ boost::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded,
this, _1, *itr,
+ topicsNeedCreate, callback));
+ }
+}
+
+void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result,
const std::string& topic,
+
boost::shared_ptr<std::atomic<int>> topicsNeedCreate,
+ ResultCallback
callback) {
+ int previous = topicsNeedCreate->fetch_sub(1);
+ assert(previous > 0);
+
+ if (result != ResultOk) {
+ LOG_ERROR("Failed when subscribed to topic " << topic << " Error - "
<< result);
+ callback(result);
+ return;
+ }
+
+ if (topicsNeedCreate->load() == 0) {
+ LOG_DEBUG("Subscribed all new added topics");
+ callback(result);
+ }
+}
+
+void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr
removedTopics,
+ ResultCallback callback) {
+ // start call subscribeOneTopicAsync for each single topic
+ if (removedTopics->empty()) {
+ LOG_DEBUG("no topics need unsubscribe");
+ callback(ResultOk);
+ return;
+ }
+ int topicsNumber = removedTopics->size();
+
+ boost::shared_ptr<std::atomic<int>> topicsNeedUnsub =
boost::make_shared<std::atomic<int>>(topicsNumber);
+ ResultCallback oneTopicUnsubscribedCallback = [this, topicsNeedUnsub,
callback](Result result) {
+ int previous = topicsNeedUnsub->fetch_sub(1);
+ assert(previous > 0);
+
+ if (result != ResultOk) {
+ LOG_ERROR("Failed when unsubscribe to one topic. Error - " <<
result);
+ callback(result);
+ return;
+ }
+
+ if (topicsNeedUnsub->load() == 0) {
+ LOG_DEBUG("unSubscribed all needed topics");
+ callback(result);
+ }
+ };
+
+ // unsubscribe for each passed in topic
+ for (std::vector<std::string>::const_iterator itr =
removedTopics->begin(); itr != removedTopics->end();
+ itr++) {
+ MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(*itr,
oneTopicUnsubscribedCallback);
+ }
+}
+
+NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(const
std::vector<std::string>& topics,
+ const
std::regex& pattern) {
+ NamespaceTopicsPtr topicsResultPtr =
boost::make_shared<std::vector<std::string>>();
+
+ for (std::vector<std::string>::const_iterator itr = topics.begin(); itr !=
topics.end(); itr++) {
+ if (std::regex_match(*itr, pattern)) {
+ topicsResultPtr->push_back(*itr);
+ }
+ }
+ return topicsResultPtr;
+}
+
+NamespaceTopicsPtr
PatternMultiTopicsConsumerImpl::topicsListsMinus(std::vector<std::string>&
list1,
+
std::vector<std::string>& list2) {
+ NamespaceTopicsPtr topicsResultPtr =
boost::make_shared<std::vector<std::string>>();
+ std::remove_copy_if(list1.begin(), list1.end(),
std::back_inserter(*topicsResultPtr),
+ [&list2](const std::string& arg) {
+ return (std::find(list2.begin(), list2.end(), arg)
!= list2.end());
+ });
+
+ return topicsResultPtr;
+}
+
+void PatternMultiTopicsConsumerImpl::start() {
+ MultiTopicsConsumerImpl::start();
+
+ LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
+
+ // Init autoDiscoveryTimer task only once, wait for the timeout to happen
+ if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) {
+ autoDiscoveryTimer_ =
client_->getIOExecutorProvider()->get()->createDeadlineTimer();
+
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+ autoDiscoveryTimer_->async_wait(
+
boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1));
+ }
+}
+
+void PatternMultiTopicsConsumerImpl::shutdown() {
+ Lock lock(mutex_);
+ state_ = Closed;
+ autoDiscoveryTimer_->cancel();
+ multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+}
+
+void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+ MultiTopicsConsumerImpl::closeAsync(callback);
+ autoDiscoveryTimer_->cancel();
+}
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
new file mode 100644
index 0000000..503dc99
--- /dev/null
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
+#define PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
+#include "ConsumerImpl.h"
+#include "ClientImpl.h"
+#include <regex>
+#include "boost/enable_shared_from_this.hpp"
+#include <lib/TopicName.h>
+#include <lib/NamespaceName.h>
+#include "MultiTopicsConsumerImpl.h"
+
+namespace pulsar {
+
+class PatternMultiTopicsConsumerImpl;
+
+class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
+ public:
+ // currently we support topics under same namespace, so `patternString` is
a regex,
+ // which only contains after namespace part.
+ // when subscribe, client will first get all topics that match given
pattern.
+ // `topics` contains the topics that match `patternString`.
+ PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string
patternString,
+ const std::vector<std::string>& topics,
+ const std::string& subscriptionName, const
ConsumerConfiguration& conf,
+ const LookupServicePtr lookupServicePtr_);
+
+ const std::regex getPattern();
+
+ void autoDiscoveryTimerTask(const boost::system::error_code& err);
+
+ // filter input `topics` with given `pattern`, return matched topics
+ static NamespaceTopicsPtr topicsPatternFilter(const
std::vector<std::string>& topics,
+ const std::regex& pattern);
+
+ // Find out topics, which are in `list1` but not in `list2`.
+ static NamespaceTopicsPtr topicsListsMinus(std::vector<std::string>& list1,
+ std::vector<std::string>&
list2);
+
+ virtual void closeAsync(ResultCallback callback);
+ virtual void start();
+ virtual void shutdown();
+
+ private:
+ const std::string patternString_;
+ const std::regex pattern_;
+ typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
+ TimerPtr autoDiscoveryTimer_;
+ bool autoDiscoveryRunning_;
+
+ void resetAutoDiscoveryTimer();
+ void timerGetTopicsOfNamespace(const Result result, const
NamespaceTopicsPtr topics);
+ void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback
callback);
+ void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback
callback);
+ void handleOneTopicAdded(const Result result, const std::string& topic,
+ boost::shared_ptr<std::atomic<int>>
topicsNeedCreate, ResultCallback callback);
+};
+
+} // namespace pulsar
+#endif // PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index cf28b34..d4c1df8 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -34,6 +34,7 @@
#include <set>
#include <vector>
#include <lib/MultiTopicsConsumerImpl.h>
+#include <lib/PatternMultiTopicsConsumerImpl.h>
#include "lib/Future.h"
#include "lib/Utils.h"
DECLARE_LOG_OBJECT()
@@ -1679,3 +1680,259 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
client.shutdown();
}
+
+TEST(BasicEndToEndTest, testPatternTopicsConsumerInvalid) {
+ Client client(lookupUrl);
+
+ // invalid namespace
+ std::string pattern =
"invalidDomain://prop/unit/ns/patternMultiTopicsConsumerInvalid.*";
+ std::string subName = "testPatternMultiTopicsConsumerInvalid";
+
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeWithRegexAsync(pattern, subName,
WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ Result result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultInvalidTopicName, result);
+
+ client.shutdown();
+}
+
+// create 4 topics, in which 3 topics match the pattern,
+// verify PatternMultiTopicsConsumer subscribed matched topics,
+// and only receive messages from matched topics.
+TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
+ Client client(lookupUrl);
+ std::string pattern =
"persistent://prop/unit/ns1/patternMultiTopicsConsumer.*";
+
+ std::string subName = "testPatternMultiTopicsConsumer";
+ std::string topicName1 =
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub1";
+ std::string topicName2 =
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub2";
+ std::string topicName3 =
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub3";
+ // This will not match pattern
+ std::string topicName4 =
"persistent://prop/unit/ns1/patternMultiTopicsNotMatchPubSub4";
+
+ // call admin api to make topics partitioned
+ std::string url1 =
+ adminUrl +
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub1/partitions";
+ std::string url2 =
+ adminUrl +
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub2/partitions";
+ std::string url3 =
+ adminUrl +
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub3/partitions";
+ std::string url4 =
+ adminUrl +
"admin/persistent/prop/unit/ns1/patternMultiTopicsNotMatchPubSub4/partitions";
+
+ int res = makePutRequest(url1, "2");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url2, "3");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url3, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url4, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer1;
+ Result result = client.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer4;
+ result = client.createProducer(topicName4, producer4);
+ ASSERT_EQ(ResultOk, result);
+
+ LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1
producer not match");
+
+ int messageNumber = 100;
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ consConfig.setReceiverQueueSize(10); // size for each sub-consumer
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeWithRegexAsync(pattern, subName, consConfig,
+
WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+ LOG_INFO("created topics consumer on a pattern that match 3 topics");
+
+ std::string msgContent = "msg-content";
+ LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer1.send(msg));
+ }
+
+ msgContent = "msg-content2";
+ LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer2.send(msg));
+ }
+
+ msgContent = "msg-content3";
+ LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer3.send(msg));
+ }
+
+ msgContent = "msg-content4";
+ LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer4.send(msg));
+ }
+
+ LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+ for (int i = 0; i < 3 * messageNumber; i++) {
+ Message m;
+ ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+ }
+ LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+ // verify no more to receive, because producer4 not match pattern
+ Message m;
+ ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+ client.shutdown();
+}
+
+// create a pattern consumer, which contains no match topics at beginning.
+// create 4 topics, in which 3 topics match the pattern.
+// verify PatternMultiTopicsConsumer subscribed matched topics, after a while,
+// and only receive messages from matched topics.
+TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) {
+ Client client(lookupUrl);
+ std::string pattern =
"persistent://prop/unit/ns2/patternTopicsAutoConsumer.*";
+ Result result;
+ std::string subName = "testPatternTopicsAutoConsumer";
+
+ // 1. create a pattern consumer, which contains no match topics at
beginning.
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ consConfig.setReceiverQueueSize(10); // size for each sub-consumer
+ consConfig.setPatternAutoDiscoveryPeriod(1); // set waiting time for auto
discovery
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeWithRegexAsync(pattern, subName, consConfig,
+
WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+ LOG_INFO("created pattern consumer with not match topics at beginning");
+
+ // 2. create 4 topics, in which 3 match the pattern.
+ std::string topicName1 =
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub1";
+ std::string topicName2 =
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub2";
+ std::string topicName3 =
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub3";
+ // This will not match pattern
+ std::string topicName4 =
"persistent://prop/unit/ns2/patternMultiTopicsNotMatchPubSub4";
+
+ // call admin api to make topics partitioned
+ std::string url1 =
+ adminUrl +
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub1/partitions";
+ std::string url2 =
+ adminUrl +
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub2/partitions";
+ std::string url3 =
+ adminUrl +
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub3/partitions";
+ std::string url4 =
+ adminUrl +
"admin/persistent/prop/unit/ns2/patternMultiTopicsNotMatchPubSub4/partitions";
+
+ int res = makePutRequest(url1, "2");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url2, "3");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url3, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url4, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer1;
+ result = client.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer4;
+ result = client.createProducer(topicName4, producer4);
+ ASSERT_EQ(ResultOk, result);
+ LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1
producer not match");
+
+ // 3. wait enough time to trigger auto discovery
+ usleep(2 * 1000 * 1000);
+
+ // 4. produce data.
+ int messageNumber = 100;
+ std::string msgContent = "msg-content";
+ LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer1.send(msg));
+ }
+
+ msgContent = "msg-content2";
+ LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer2.send(msg));
+ }
+
+ msgContent = "msg-content3";
+ LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer3.send(msg));
+ }
+
+ msgContent = "msg-content4";
+ LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer4.send(msg));
+ }
+
+ // 5. pattern consumer already subscribed 3 topics
+ LOG_INFO("Consuming and acking 300 messages by pattern topics consumer");
+ for (int i = 0; i < 3 * messageNumber; i++) {
+ Message m;
+ ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+ }
+ LOG_INFO("Consumed and acked 300 messages by pattern topics consumer");
+
+ // verify no more to receive, because producer4 not match pattern
+ Message m;
+ ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+ client.shutdown();
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
index 67e6af5..f706eb8 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
@@ -25,9 +25,12 @@
#include <Future.h>
#include <Utils.h>
#include "ConnectionPool.h"
+#include "HttpHelper.h"
#include <pulsar/Authentication.h>
#include <boost/exception/all.hpp>
+DECLARE_LOG_OBJECT()
+
using namespace pulsar;
TEST(BinaryLookupServiceTest, basicLookup) {
@@ -56,3 +59,67 @@ TEST(BinaryLookupServiceTest, basicLookup) {
ASSERT_TRUE(lookupData != NULL);
ASSERT_EQ(url, lookupData->getBrokerUrl());
}
+
+TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) {
+ std::string url = "pulsar://localhost:8885";
+ std::string adminUrl = "http://localhost:8765/";
+ Result result;
+ // 1. create some topics under same namespace
+ Client client(url);
+
+ std::string topicName1 =
"persistent://prop/unit/ns4/basicGetNamespaceTopics1";
+ std::string topicName2 =
"persistent://prop/unit/ns4/basicGetNamespaceTopics2";
+ std::string topicName3 =
"persistent://prop/unit/ns4/basicGetNamespaceTopics3";
+ // This is not in same namespace.
+ std::string topicName4 =
"persistent://prop/unit/ns2/basicGetNamespaceTopics4";
+
+ // call admin api to make topics partitioned
+ std::string url1 = adminUrl +
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics1/partitions";
+ std::string url2 = adminUrl +
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics2/partitions";
+ std::string url3 = adminUrl +
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics3/partitions";
+
+ int res = makePutRequest(url1, "2");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url2, "3");
+ ASSERT_FALSE(res != 204 && res != 409);
+ res = makePutRequest(url3, "4");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer1;
+ result = client.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer4;
+ result = client.createProducer(topicName4, producer4);
+ ASSERT_EQ(ResultOk, result);
+
+ // 2. call getTopicsOfNamespaceAsync
+ ExecutorServiceProviderPtr service =
boost::make_shared<ExecutorServiceProvider>(1);
+ AuthenticationPtr authData = AuthFactory::Disabled();
+ ClientConfiguration conf;
+ ExecutorServiceProviderPtr
ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(1));
+ ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
+ BinaryProtoLookupService lookupService(pool_, url);
+
+ TopicNamePtr topicName = TopicName::get(topicName1);
+ NamespaceNamePtr nsName = topicName->getNamespaceName();
+
+ Future<Result, NamespaceTopicsPtr> getTopicsFuture =
lookupService.getTopicsOfNamespaceAsync(nsName);
+ NamespaceTopicsPtr topicsData;
+ result = getTopicsFuture.get(topicsData);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_TRUE(topicsData != NULL);
+
+ // 3. verify result contains first 3 topic
+ ASSERT_EQ(topicsData->size(), 3);
+ ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1)
!= topicsData->end());
+ ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2)
!= topicsData->end());
+ ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3)
!= topicsData->end());
+
+ client.shutdown();
+}