This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b23134c Cpp client: add PatternMultiTopicsConsumerImpl to support regex subscribe (#2219) b23134c is described below commit b23134c422456f45ed14c3c49de53eb8889e45fc Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Sat Aug 4 14:56:56 2018 +0800 Cpp client: add PatternMultiTopicsConsumerImpl to support regex subscribe (#2219) In PR #1279 and #1298 we added regex based subscription. This is a catch up work to add `PatternMultiTopicsConsumerImpl` in cpp client. --- pulsar-client-cpp/include/pulsar/Client.h | 16 ++ .../include/pulsar/ConsumerConfiguration.h | 10 + pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 42 ++++ pulsar-client-cpp/lib/BinaryProtoLookupService.h | 9 + pulsar-client-cpp/lib/Client.cc | 24 ++ pulsar-client-cpp/lib/ClientConnection.cc | 86 ++++++- pulsar-client-cpp/lib/ClientConnection.h | 13 +- pulsar-client-cpp/lib/ClientImpl.cc | 58 +++++ pulsar-client-cpp/lib/ClientImpl.h | 8 + pulsar-client-cpp/lib/Commands.cc | 18 ++ pulsar-client-cpp/lib/Commands.h | 1 + pulsar-client-cpp/lib/ConsumerConfiguration.cc | 6 + pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 +- pulsar-client-cpp/lib/HTTPLookupService.cc | 112 +++++++-- pulsar-client-cpp/lib/HTTPLookupService.h | 9 +- pulsar-client-cpp/lib/LookupService.h | 11 + pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 2 +- .../lib/PatternMultiTopicsConsumerImpl.cc | 235 +++++++++++++++++++ .../lib/PatternMultiTopicsConsumerImpl.h | 76 ++++++ pulsar-client-cpp/tests/BasicEndToEndTest.cc | 257 +++++++++++++++++++++ pulsar-client-cpp/tests/BinaryLookupServiceTest.cc | 67 ++++++ 21 files changed, 1032 insertions(+), 32 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 6a9e487..1913851 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -99,6 +99,9 @@ class Client { void subscribeAsync(const std::string& topic, const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback); + /** + * subscribe for multiple topics under the same namespace. + */ Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName, Consumer& consumer); Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName, @@ -109,6 +112,19 @@ class Client { const ConsumerConfiguration& conf, SubscribeCallback callback); /** + * subscribe for multiple topics, which match given regexPattern, under the same namespace. + */ + Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName, + Consumer& consumer); + Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, Consumer& consumer); + + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + SubscribeCallback callback); + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback); + + /** * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified * topic. * <p> diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index c9584e3..36e5808 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -152,6 +152,16 @@ class ConsumerConfiguration { bool isReadCompacted() const; void setReadCompacted(bool compacted); + /** + * Set the time duration in minutes, for which the PatternMultiTopicsConsumer will do a pattern auto + * discovery. + * The default value is 60 seconds. less than 0 will disable auto discovery. + * + * @param periodInSeconds period in seconds to do an auto discovery + */ + void setPatternAutoDiscoveryPeriod(int periodInSeconds); + int getPatternAutoDiscoveryPeriod() const; + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index c4bef30..296faee 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -152,4 +152,46 @@ uint64_t BinaryProtoLookupService::newRequestId() { Lock lock(mutex_); return ++requestIdGenerator_; } + +Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync( + const NamespaceNamePtr& nsName) { + NamespaceTopicsPromisePtr promise = boost::make_shared<Promise<Result, NamespaceTopicsPtr>>(); + if (!nsName) { + promise->setFailed(ResultInvalidTopicName); + return promise->getFuture(); + } + std::string namespaceName = nsName->toString(); + Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); + future.addListener(boost::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this, + namespaceName, _1, _2, promise)); + return promise->getFuture(); +} + +void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, + const ClientConnectionWeakPtr& clientCnx, + NamespaceTopicsPromisePtr promise) { + if (result != ResultOk) { + promise->setFailed(ResultConnectError); + return; + } + + ClientConnectionPtr conn = clientCnx.lock(); + uint64_t requestId = newRequestId(); + LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName); + + conn->newGetTopicsOfNamespace(nsName, requestId) + .addListener( + boost::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, _1, _2, promise)); +} + +void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, + NamespaceTopicsPromisePtr promise) { + if (result != ResultOk) { + promise->setFailed(ResultLookupError); + return; + } + + promise->setValue(topicsPtr); +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h index f647c62..36fb5e8 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h @@ -40,6 +40,8 @@ class BinaryProtoLookupService : public LookupService { Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName); + Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName); + private: boost::mutex mutex_; uint64_t requestIdGenerator_; @@ -61,6 +63,13 @@ class BinaryProtoLookupService : public LookupService { const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise); + void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, + const ClientConnectionWeakPtr& clientCnx, + NamespaceTopicsPromisePtr promise); + + void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, + NamespaceTopicsPromisePtr promise); + uint64_t newRequestId(); }; typedef boost::shared_ptr<BinaryProtoLookupService> BinaryProtoLookupServicePtr; diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index bba3520..5cfe01f 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -114,6 +114,30 @@ void Client::subscribeAsync(const std::vector<std::string>& topics, const std::s impl_->subscribeAsync(topics, subscriptionName, conf, callback); } +Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, + Consumer& consumer) { + return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer); +} + +Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, + const ConsumerConfiguration& conf, Consumer& consumer) { + Promise<Result, Consumer> promise; + subscribeWithRegexAsync(regexPattern, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise)); + Future<Result, Consumer> future = promise.getFuture(); + + return future.get(consumer); +} + +void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, + SubscribeCallback callback) { + subscribeWithRegexAsync(regexPattern, subscriptionName, ConsumerConfiguration(), callback); +} + +void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeCallback callback) { + impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback); +} + Result Client::createReader(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, Reader& reader) { Promise<Result, Reader> promise; diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 4e6d0f2..da30707 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -219,7 +219,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte } void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) { - std::vector<Promise<Result, BrokerConsumerStatsImpl> > consumerStatsPromises; + std::vector<Promise<Result, BrokerConsumerStatsImpl>> consumerStatsPromises; Lock lock(mutex_); for (int i = 0; i < consumerStatsRequests.size(); i++) { @@ -856,6 +856,7 @@ void ClientConnection::handleIncomingCommand() { << " -- req_id: " << error.request_id()); Lock lock(mutex_); + PendingRequestsMap::iterator it = pendingRequests_.find(error.request_id()); if (it != pendingRequests_.end()) { PendingRequestData requestData = it->second; @@ -865,19 +866,28 @@ void ClientConnection::handleIncomingCommand() { requestData.promise.setFailed(getResult(error.error())); requestData.timer->cancel(); } else { - PendingGetLastMessageIdRequestsMap::iterator it2 = + PendingGetLastMessageIdRequestsMap::iterator it = pendingGetLastMessageIdRequests_.find(error.request_id()); - if (it2 != pendingGetLastMessageIdRequests_.end()) { - Promise<Result, MessageId> getLastMessageIdPromise = it2->second; - pendingGetLastMessageIdRequests_.erase(it2); + if (it != pendingGetLastMessageIdRequests_.end()) { + Promise<Result, MessageId> getLastMessageIdPromise = it->second; + pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); getLastMessageIdPromise.setFailed(getResult(error.error())); } else { - lock.unlock(); + PendingGetNamespaceTopicsMap::iterator it = + pendingGetNamespaceTopicsRequests_.find(error.request_id()); + if (it != pendingGetNamespaceTopicsRequests_.end()) { + Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise = it->second; + pendingGetNamespaceTopicsRequests_.erase(it); + lock.unlock(); + + getNamespaceTopicsPromise.setFailed(getResult(error.error())); + } else { + lock.unlock(); + } } } - break; } @@ -978,6 +988,51 @@ void ClientConnection::handleIncomingCommand() { break; } + case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: { + const CommandGetTopicsOfNamespaceResponse& response = + incomingCmd_.gettopicsofnamespaceresponse(); + + LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from server. req_id: " + << response.request_id() << " topicsSize" << response.topics_size()); + + Lock lock(mutex_); + PendingGetNamespaceTopicsMap::iterator it = + pendingGetNamespaceTopicsRequests_.find(response.request_id()); + + if (it != pendingGetNamespaceTopicsRequests_.end()) { + Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second; + pendingGetNamespaceTopicsRequests_.erase(it); + lock.unlock(); + + int numTopics = response.topics_size(); + std::set<std::string> topicSet; + // get all topics + for (int i = 0; i < numTopics; i++) { + // remove partition part + const std::string& topicName = response.topics(i); + int pos = topicName.find("-partition-"); + std::string filteredName = topicName.substr(0, pos); + + // filter duped topic name + if (topicSet.find(filteredName) == topicSet.end()) { + topicSet.insert(filteredName); + } + } + + NamespaceTopicsPtr topicsPtr = + boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end()); + + getTopicsPromise.setValue(topicsPtr); + } else { + lock.unlock(); + LOG_WARN( + "GetTopicsOfNamespaceResponse command - Received unknown request id from " + "server: " + << response.request_id()); + } + break; + } + default: { LOG_WARN(cnxString_ << "Received invalid message from server"); close(); @@ -1281,4 +1336,21 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume return promise.getFuture(); } +Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(const std::string& nsName, + uint64_t requestId) { + Lock lock(mutex_); + Promise<Result, NamespaceTopicsPtr> promise; + if (isClosed()) { + lock.unlock(); + LOG_ERROR(cnxString_ << "Client is not connected to the broker"); + promise.setFailed(ResultNotConnected); + return promise.getFuture(); + } + + pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise)); + lock.unlock(); + sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId)); + return promise.getFuture(); +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 860ca6a..bdbc8e5 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -70,6 +70,8 @@ struct OpSendMsg; typedef std::pair<std::string, int64_t> ResponseData; +typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr; + class ClientConnection : public boost::enable_shared_from_this<ClientConnection> { enum State { @@ -81,7 +83,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> public: typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr; - typedef boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&> > TlsSocketPtr; + typedef boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>> TlsSocketPtr; typedef boost::shared_ptr<ClientConnection> ConnectionPtr; typedef boost::function<void(const boost::system::error_code&, ConnectionPtr)> ConnectionListener; typedef std::vector<ConnectionListener>::iterator ListenerIterator; @@ -144,6 +146,8 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, uint64_t requestId); + Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); + private: struct PendingRequestData { Promise<Result, ResponseData> promise; @@ -264,12 +268,15 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap; ConsumersMap consumers_; - typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> > PendingConsumerStatsMap; + typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map<long, Promise<Result, MessageId> > PendingGetLastMessageIdRequestsMap; + typedef std::map<long, Promise<Result, MessageId>> PendingGetLastMessageIdRequestsMap; PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; + typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap; + PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; + boost::mutex mutex_; typedef boost::unique_lock<boost::mutex> Lock; diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 3768926..ec113fc 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -25,6 +25,7 @@ #include "PartitionedProducerImpl.h" #include "PartitionedConsumerImpl.h" #include "MultiTopicsConsumerImpl.h" +#include "PatternMultiTopicsConsumerImpl.h" #include "SimpleLoggerImpl.h" #include "Log4CxxLogger.h" #include <boost/bind.hpp> @@ -35,6 +36,7 @@ #include <lib/HTTPLookupService.h> #include <lib/TopicName.h> #include <algorithm> +#include <regex> DECLARE_LOG_OBJECT() @@ -119,6 +121,9 @@ ExecutorServiceProviderPtr ClientImpl::getListenerExecutorProvider() { return li ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { return partitionListenerExecutorProvider_; } + +LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; } + void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf, CreateProducerCallback callback) { TopicNamePtr topicName; @@ -212,6 +217,59 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat consumers_.push_back(reader->getConsumer()); } +void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback) { + TopicNamePtr topicNamePtr = TopicName::get(regexPattern); + + Lock lock(mutex_); + if (state_ != Open) { + lock.unlock(); + callback(ResultAlreadyClosed, Consumer()); + return; + } else { + lock.unlock(); + if (!topicNamePtr) { + LOG_ERROR("Topic pattern not valid: " << regexPattern); + callback(ResultInvalidTopicName, Consumer()); + return; + } + } + + NamespaceNamePtr nsName = topicNamePtr->getNamespaceName(); + + lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener( + boost::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), _1, _2, regexPattern, + consumerName, conf, callback)); +} + +void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics, + const std::string& regexPattern, + const std::string& consumerName, + const ConsumerConfiguration& conf, + SubscribeCallback callback) { + if (result == ResultOk) { + ConsumerImplBasePtr consumer; + + std::regex pattern(regexPattern); + + NamespaceTopicsPtr matchTopics = + PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern); + + consumer = boost::make_shared<PatternMultiTopicsConsumerImpl>( + shared_from_this(), regexPattern, *matchTopics, consumerName, conf, lookupServicePtr_); + + consumer->getConsumerCreatedFuture().addListener( + boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), _1, _2, callback, consumer)); + Lock lock(mutex_); + consumers_.push_back(consumer); + lock.unlock(); + consumer->start(); + } else { + LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result); + callback(result, Consumer()); + } +} + void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback) { TopicNamePtr topicNamePtr; diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 550298b..54d459d 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -60,6 +60,9 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { void subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback); + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback); + void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, ReaderCallback callback); @@ -82,6 +85,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); + LookupServicePtr getLookup(); friend class PulsarFriend; private: @@ -106,6 +110,10 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { void handleClose(Result result, SharedInt remaining, ResultCallback callback); + void createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics, + const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback); + enum State { Open, diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 13bf99a..8a1933b 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -324,6 +324,18 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t request return buffer; } +SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId) { + BaseCommand cmd; + cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE); + CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace(); + getTopics->set_request_id(requestId); + getTopics->set_namespace_(nsName); + + const SharedBuffer buffer = writeMessageWithSize(cmd); + cmd.clear_gettopicsofnamespace(); + return buffer; +} + std::string Commands::messageType(BaseCommand_Type type) { switch (type) { case BaseCommand::CONNECT: @@ -416,6 +428,12 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE: return "GET_LAST_MESSAGE_ID_RESPONSE"; break; + case BaseCommand::GET_TOPICS_OF_NAMESPACE: + return "GET_TOPICS_OF_NAMESPACE"; + break; + case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: + return "GET_TOPICS_OF_NAMESPACE_RESPONSE"; + break; }; } diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 53fb1bb..d9b8589 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -112,6 +112,7 @@ class Commands { static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId); static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t requestId); + static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); private: Commands(); diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc index 0c145c1..058ca57 100644 --- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -105,4 +105,10 @@ bool ConsumerConfiguration::isReadCompacted() const { return impl_->readCompacte void ConsumerConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; } +void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) { + impl_->patternAutoDiscoveryPeriod = periodInSeconds; +} + +int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h index eb0c374..0cc0c72 100644 --- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -35,6 +35,7 @@ struct ConsumerConfigurationImpl { CryptoKeyReaderPtr cryptoKeyReader; ConsumerCryptoFailureAction cryptoFailureAction; bool readCompacted; + int patternAutoDiscoveryPeriod; ConsumerConfigurationImpl() : unAckedMessagesTimeoutMs(0), consumerType(ConsumerExclusive), @@ -45,7 +46,8 @@ struct ConsumerConfigurationImpl { maxTotalReceiverQueueSizeAcrossPartitions(50000), cryptoKeyReader(), cryptoFailureAction(ConsumerCryptoFailureAction::FAIL), - readCompacted(false) {} + readCompacted(false), + patternAutoDiscoveryPeriod(60) {} }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 36d11f5..fe27e8d 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -68,8 +68,9 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::st << topicName->getEncodedLocalName(); } - executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), - promise, completeUrlStream.str(), Lookup)); + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest, + shared_from_this(), promise, completeUrlStream.str(), + Lookup)); return promise.getFuture(); } @@ -89,8 +90,27 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync << '/' << PARTITION_METHOD_NAME; } - executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), - promise, completeUrlStream.str(), PartitionMetaData)); + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest, + shared_from_this(), promise, completeUrlStream.str(), + PartitionMetaData)); + return promise.getFuture(); +} + +Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync( + const NamespaceNamePtr &nsName) { + NamespaceTopicsPromise promise; + std::stringstream completeUrlStream; + + if (nsName->isV2()) { + completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/' + << "topics"; + } else { + completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/' + << "destinations"; + } + + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest, + shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); } @@ -99,19 +119,28 @@ static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void return size * nmemb; } -void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl, - RequestType requestType) { +void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, + const std::string completeUrl) { + std::string responseData; + Result result = sendHTTPRequest(completeUrl, responseData); + + if (result != ResultOk) { + promise.setFailed(result); + } else { + promise.setValue(parseNamespaceTopicsData(responseData)); + } +} + +Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, std::string &responseData) { CURL *handle; CURLcode res; - std::string responseData; std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_; handle = curl_easy_init(); if (!handle) { LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); - promise.setFailed(ResultLookupError); // No curl_easy_cleanup required since handle not initialized - return; + return ResultLookupError; } // set URL curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str()); @@ -148,9 +177,8 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string "All Authentication methods should have AuthenticationData and return true on getAuthData for " "url " << completeUrl); - promise.setFailed(authResult); curl_easy_cleanup(handle); - return; + return authResult; } struct curl_slist *list = NULL; if (authDataContent->hasDataForHttp()) { @@ -158,7 +186,7 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string } curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list); - LOG_INFO("Curl Lookup Request sent for" << completeUrl); + LOG_INFO("Curl Lookup Request sent for " << completeUrl); // Make get call to server res = curl_easy_perform(handle); @@ -166,16 +194,17 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string // Free header list curl_slist_free_all(list); + Result retResult = ResultOk; + switch (res) { case CURLE_OK: long response_code; curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); LOG_INFO("Response received for url " << completeUrl << " code " << response_code); if (response_code == 200) { - promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) - : parseLookupData(responseData)); + retResult = ResultOk; } else { - promise.setFailed(ResultLookupError); + retResult = ResultLookupError; } break; case CURLE_COULDNT_CONNECT: @@ -183,22 +212,23 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string case CURLE_COULDNT_RESOLVE_HOST: case CURLE_HTTP_RETURNED_ERROR: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultConnectError); + retResult = ResultConnectError; break; case CURLE_READ_ERROR: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultReadError); + retResult = ResultReadError; break; case CURLE_OPERATION_TIMEDOUT: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultTimeout); + retResult = ResultTimeout; break; default: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultLookupError); + retResult = ResultLookupError; break; } curl_easy_cleanup(handle); + return retResult; } LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) { @@ -243,4 +273,48 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) LOG_INFO("parseLookupData = " << *lookupDataResultPtr); return lookupDataResultPtr; } + +NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) { + Json::Value root; + Json::Reader reader; + if (!reader.parse(json, root, false)) { + LOG_ERROR("Failed to parse json of Topics of Namespace: " << reader.getFormatedErrorMessages() + << "\nInput Json = " << json); + return NamespaceTopicsPtr(); + } + + Json::Value topicsArray = root["topics"]; + std::set<std::string> topicSet; + // get all topics + for (int i = 0; i < topicsArray.size(); i++) { + // remove partition part + const std::string &topicName = topicsArray[i].asString(); + int pos = topicName.find("-partition-"); + std::string filteredName = topicName.substr(0, pos); + + // filter duped topic name + if (topicSet.find(filteredName) == topicSet.end()) { + topicSet.insert(filteredName); + } + } + + NamespaceTopicsPtr topicsResultPtr = + boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end()); + + return topicsResultPtr; +} + +void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std::string completeUrl, + RequestType requestType) { + std::string responseData; + Result result = sendHTTPRequest(completeUrl, responseData); + + if (result != ResultOk) { + promise.setFailed(result); + } else { + promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) + : parseLookupData(responseData)); + } +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h b/pulsar-client-cpp/lib/HTTPLookupService.h index d7fff33..66cd251 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.h +++ b/pulsar-client-cpp/lib/HTTPLookupService.h @@ -52,7 +52,12 @@ class HTTPLookupService : public LookupService, public boost::enable_shared_from static LookupDataResultPtr parsePartitionData(const std::string&); static LookupDataResultPtr parseLookupData(const std::string&); - void sendHTTPRequest(LookupPromise, const std::string, RequestType); + static NamespaceTopicsPtr parseNamespaceTopicsData(const std::string&); + + void handleLookupHTTPRequest(LookupPromise, const std::string, RequestType); + void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, const std::string completeUrl); + + Result sendHTTPRequest(const std::string completeUrl, std::string& responseData); public: HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&); @@ -60,6 +65,8 @@ class HTTPLookupService : public LookupService, public boost::enable_shared_from Future<Result, LookupDataResultPtr> lookupAsync(const std::string&); Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&); + + Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName); }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h index 48d5595..1221263 100644 --- a/pulsar-client-cpp/lib/LookupService.h +++ b/pulsar-client-cpp/lib/LookupService.h @@ -26,6 +26,10 @@ #include <lib/TopicName.h> namespace pulsar { +typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr; +typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise; +typedef boost::shared_ptr<Promise<Result, NamespaceTopicsPtr>> NamespaceTopicsPromisePtr; + class LookupService { public: /* @@ -42,6 +46,13 @@ class LookupService { */ virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0; + /** + * @param namespace - namespace-name + * + * Returns all the topics name for a given namespace. + */ + virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) = 0; + virtual ~LookupService() {} }; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index 6425687..3b1d985 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -80,7 +80,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, // not supported virtual void seekAsync(const MessageId& msgId, ResultCallback callback); - private: + protected: const ClientImplPtr client_; const std::string subscriptionName_; std::string consumerStr_; diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc new file mode 100644 index 0000000..95f8a36 --- /dev/null +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "PatternMultiTopicsConsumerImpl.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; + +PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr client, + const std::string pattern, + const std::vector<std::string>& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf, + const LookupServicePtr lookupServicePtr_) + : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, + lookupServicePtr_), + patternString_(pattern), + pattern_(std::regex(pattern)), + autoDiscoveryTimer_(), + autoDiscoveryRunning_(false) {} + +const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return pattern_; } + +void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { + autoDiscoveryRunning_ = false; + autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); + autoDiscoveryTimer_->async_wait( + boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1)); +} + +void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system::error_code& err) { + if (err == boost::asio::error::operation_aborted) { + LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); + return; + } else if (err) { + LOG_ERROR(getName() << "Timer error: " << err.message()); + return; + } + + if (state_ != Ready) { + LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_); + resetAutoDiscoveryTimer(); + return; + } + + if (autoDiscoveryRunning_) { + LOG_DEBUG("autoDiscoveryTimerTask still running, cancel this running. "); + return; + } + + autoDiscoveryRunning_ = true; + + // already get namespace from pattern. + assert(namespaceName_); + + lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_) + .addListener(boost::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, _1, _2)); +} + +void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result result, + const NamespaceTopicsPtr topics) { + if (result != ResultOk) { + LOG_ERROR("Error in Getting topicsOfNameSpace. result: " << result); + resetAutoDiscoveryTimer(); + return; + } + + NamespaceTopicsPtr newTopics = PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern_); + // get old topics in consumer: + NamespaceTopicsPtr oldTopics = boost::make_shared<std::vector<std::string>>(); + for (std::map<std::string, int>::iterator it = topicsPartitions_.begin(); it != topicsPartitions_.end(); + it++) { + oldTopics->push_back(it->first); + } + NamespaceTopicsPtr topicsAdded = topicsListsMinus(*newTopics, *oldTopics); + NamespaceTopicsPtr topicsRemoved = topicsListsMinus(*oldTopics, *newTopics); + + // callback method when removed topics all un-subscribed. + ResultCallback topicsRemovedCallback = [this](Result result) { + if (result != ResultOk) { + LOG_ERROR("Failed to unsubscribe topics: " << result); + } + resetAutoDiscoveryTimer(); + }; + + // callback method when added topics all subscribed. + ResultCallback topicsAddedCallback = [this, topicsRemoved, topicsRemovedCallback](Result result) { + if (result == ResultOk) { + // call to unsubscribe all removed topics. + onTopicsRemoved(topicsRemoved, topicsRemovedCallback); + } else { + resetAutoDiscoveryTimer(); + } + }; + + // call to subscribe new added topics, then in its callback do unsubscribe + onTopicsAdded(topicsAdded, topicsAddedCallback); +} + +void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback) { + // start call subscribeOneTopicAsync for each single topic + + if (addedTopics->empty()) { + LOG_DEBUG("no topics need subscribe"); + callback(ResultOk); + return; + } + int topicsNumber = addedTopics->size(); + + boost::shared_ptr<std::atomic<int>> topicsNeedCreate = boost::make_shared<std::atomic<int>>(topicsNumber); + // subscribe for each passed in topic + for (std::vector<std::string>::const_iterator itr = addedTopics->begin(); itr != addedTopics->end(); + itr++) { + MultiTopicsConsumerImpl::subscribeOneTopicAsync(*itr).addListener( + boost::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded, this, _1, *itr, + topicsNeedCreate, callback)); + } +} + +void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result, const std::string& topic, + boost::shared_ptr<std::atomic<int>> topicsNeedCreate, + ResultCallback callback) { + int previous = topicsNeedCreate->fetch_sub(1); + assert(previous > 0); + + if (result != ResultOk) { + LOG_ERROR("Failed when subscribed to topic " << topic << " Error - " << result); + callback(result); + return; + } + + if (topicsNeedCreate->load() == 0) { + LOG_DEBUG("Subscribed all new added topics"); + callback(result); + } +} + +void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedTopics, + ResultCallback callback) { + // start call subscribeOneTopicAsync for each single topic + if (removedTopics->empty()) { + LOG_DEBUG("no topics need unsubscribe"); + callback(ResultOk); + return; + } + int topicsNumber = removedTopics->size(); + + boost::shared_ptr<std::atomic<int>> topicsNeedUnsub = boost::make_shared<std::atomic<int>>(topicsNumber); + ResultCallback oneTopicUnsubscribedCallback = [this, topicsNeedUnsub, callback](Result result) { + int previous = topicsNeedUnsub->fetch_sub(1); + assert(previous > 0); + + if (result != ResultOk) { + LOG_ERROR("Failed when unsubscribe to one topic. Error - " << result); + callback(result); + return; + } + + if (topicsNeedUnsub->load() == 0) { + LOG_DEBUG("unSubscribed all needed topics"); + callback(result); + } + }; + + // unsubscribe for each passed in topic + for (std::vector<std::string>::const_iterator itr = removedTopics->begin(); itr != removedTopics->end(); + itr++) { + MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(*itr, oneTopicUnsubscribedCallback); + } +} + +NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(const std::vector<std::string>& topics, + const std::regex& pattern) { + NamespaceTopicsPtr topicsResultPtr = boost::make_shared<std::vector<std::string>>(); + + for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) { + if (std::regex_match(*itr, pattern)) { + topicsResultPtr->push_back(*itr); + } + } + return topicsResultPtr; +} + +NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsListsMinus(std::vector<std::string>& list1, + std::vector<std::string>& list2) { + NamespaceTopicsPtr topicsResultPtr = boost::make_shared<std::vector<std::string>>(); + std::remove_copy_if(list1.begin(), list1.end(), std::back_inserter(*topicsResultPtr), + [&list2](const std::string& arg) { + return (std::find(list2.begin(), list2.end(), arg) != list2.end()); + }); + + return topicsResultPtr; +} + +void PatternMultiTopicsConsumerImpl::start() { + MultiTopicsConsumerImpl::start(); + + LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_."); + + // Init autoDiscoveryTimer task only once, wait for the timeout to happen + if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) { + autoDiscoveryTimer_ = client_->getIOExecutorProvider()->get()->createDeadlineTimer(); + autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); + autoDiscoveryTimer_->async_wait( + boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1)); + } +} + +void PatternMultiTopicsConsumerImpl::shutdown() { + Lock lock(mutex_); + state_ = Closed; + autoDiscoveryTimer_->cancel(); + multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed); +} + +void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { + MultiTopicsConsumerImpl::closeAsync(callback); + autoDiscoveryTimer_->cancel(); +} diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h new file mode 100644 index 0000000..503dc99 --- /dev/null +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER +#define PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER +#include "ConsumerImpl.h" +#include "ClientImpl.h" +#include <regex> +#include "boost/enable_shared_from_this.hpp" +#include <lib/TopicName.h> +#include <lib/NamespaceName.h> +#include "MultiTopicsConsumerImpl.h" + +namespace pulsar { + +class PatternMultiTopicsConsumerImpl; + +class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { + public: + // currently we support topics under same namespace, so `patternString` is a regex, + // which only contains after namespace part. + // when subscribe, client will first get all topics that match given pattern. + // `topics` contains the topics that match `patternString`. + PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string patternString, + const std::vector<std::string>& topics, + const std::string& subscriptionName, const ConsumerConfiguration& conf, + const LookupServicePtr lookupServicePtr_); + + const std::regex getPattern(); + + void autoDiscoveryTimerTask(const boost::system::error_code& err); + + // filter input `topics` with given `pattern`, return matched topics + static NamespaceTopicsPtr topicsPatternFilter(const std::vector<std::string>& topics, + const std::regex& pattern); + + // Find out topics, which are in `list1` but not in `list2`. + static NamespaceTopicsPtr topicsListsMinus(std::vector<std::string>& list1, + std::vector<std::string>& list2); + + virtual void closeAsync(ResultCallback callback); + virtual void start(); + virtual void shutdown(); + + private: + const std::string patternString_; + const std::regex pattern_; + typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr; + TimerPtr autoDiscoveryTimer_; + bool autoDiscoveryRunning_; + + void resetAutoDiscoveryTimer(); + void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics); + void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback); + void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback callback); + void handleOneTopicAdded(const Result result, const std::string& topic, + boost::shared_ptr<std::atomic<int>> topicsNeedCreate, ResultCallback callback); +}; + +} // namespace pulsar +#endif // PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index cf28b34..d4c1df8 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -34,6 +34,7 @@ #include <set> #include <vector> #include <lib/MultiTopicsConsumerImpl.h> +#include <lib/PatternMultiTopicsConsumerImpl.h> #include "lib/Future.h" #include "lib/Utils.h" DECLARE_LOG_OBJECT() @@ -1679,3 +1680,259 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { client.shutdown(); } + +TEST(BasicEndToEndTest, testPatternTopicsConsumerInvalid) { + Client client(lookupUrl); + + // invalid namespace + std::string pattern = "invalidDomain://prop/unit/ns/patternMultiTopicsConsumerInvalid.*"; + std::string subName = "testPatternMultiTopicsConsumerInvalid"; + + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultInvalidTopicName, result); + + client.shutdown(); +} + +// create 4 topics, in which 3 topics match the pattern, +// verify PatternMultiTopicsConsumer subscribed matched topics, +// and only receive messages from matched topics. +TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { + Client client(lookupUrl); + std::string pattern = "persistent://prop/unit/ns1/patternMultiTopicsConsumer.*"; + + std::string subName = "testPatternMultiTopicsConsumer"; + std::string topicName1 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub1"; + std::string topicName2 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub2"; + std::string topicName3 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub3"; + // This will not match pattern + std::string topicName4 = "persistent://prop/unit/ns1/patternMultiTopicsNotMatchPubSub4"; + + // call admin api to make topics partitioned + std::string url1 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub1/partitions"; + std::string url2 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub2/partitions"; + std::string url3 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub3/partitions"; + std::string url4 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsNotMatchPubSub4/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url4, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + Result result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + + LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 producer not match"); + + int messageNumber = 100; + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setReceiverQueueSize(10); // size for each sub-consumer + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, consConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + LOG_INFO("created topics consumer on a pattern that match 3 topics"); + + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 100 messages by producer 1 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer1.send(msg)); + } + + msgContent = "msg-content2"; + LOG_INFO("Publishing 100 messages by producer 2 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer2.send(msg)); + } + + msgContent = "msg-content3"; + LOG_INFO("Publishing 100 messages by producer 3 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer3.send(msg)); + } + + msgContent = "msg-content4"; + LOG_INFO("Publishing 100 messages by producer 4 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer4.send(msg)); + } + + LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer"); + for (int i = 0; i < 3 * messageNumber; i++) { + Message m; + ASSERT_EQ(ResultOk, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.acknowledge(m)); + } + LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer"); + + // verify no more to receive, because producer4 not match pattern + Message m; + ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000)); + + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + + client.shutdown(); +} + +// create a pattern consumer, which contains no match topics at beginning. +// create 4 topics, in which 3 topics match the pattern. +// verify PatternMultiTopicsConsumer subscribed matched topics, after a while, +// and only receive messages from matched topics. +TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { + Client client(lookupUrl); + std::string pattern = "persistent://prop/unit/ns2/patternTopicsAutoConsumer.*"; + Result result; + std::string subName = "testPatternTopicsAutoConsumer"; + + // 1. create a pattern consumer, which contains no match topics at beginning. + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setReceiverQueueSize(10); // size for each sub-consumer + consConfig.setPatternAutoDiscoveryPeriod(1); // set waiting time for auto discovery + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, consConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + LOG_INFO("created pattern consumer with not match topics at beginning"); + + // 2. create 4 topics, in which 3 match the pattern. + std::string topicName1 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub1"; + std::string topicName2 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub2"; + std::string topicName3 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub3"; + // This will not match pattern + std::string topicName4 = "persistent://prop/unit/ns2/patternMultiTopicsNotMatchPubSub4"; + + // call admin api to make topics partitioned + std::string url1 = + adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub1/partitions"; + std::string url2 = + adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub2/partitions"; + std::string url3 = + adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub3/partitions"; + std::string url4 = + adminUrl + "admin/persistent/prop/unit/ns2/patternMultiTopicsNotMatchPubSub4/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url4, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 producer not match"); + + // 3. wait enough time to trigger auto discovery + usleep(2 * 1000 * 1000); + + // 4. produce data. + int messageNumber = 100; + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 100 messages by producer 1 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer1.send(msg)); + } + + msgContent = "msg-content2"; + LOG_INFO("Publishing 100 messages by producer 2 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer2.send(msg)); + } + + msgContent = "msg-content3"; + LOG_INFO("Publishing 100 messages by producer 3 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer3.send(msg)); + } + + msgContent = "msg-content4"; + LOG_INFO("Publishing 100 messages by producer 4 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer4.send(msg)); + } + + // 5. pattern consumer already subscribed 3 topics + LOG_INFO("Consuming and acking 300 messages by pattern topics consumer"); + for (int i = 0; i < 3 * messageNumber; i++) { + Message m; + ASSERT_EQ(ResultOk, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.acknowledge(m)); + } + LOG_INFO("Consumed and acked 300 messages by pattern topics consumer"); + + // verify no more to receive, because producer4 not match pattern + Message m; + ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000)); + + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + + client.shutdown(); +} \ No newline at end of file diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc index 67e6af5..f706eb8 100644 --- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc +++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc @@ -25,9 +25,12 @@ #include <Future.h> #include <Utils.h> #include "ConnectionPool.h" +#include "HttpHelper.h" #include <pulsar/Authentication.h> #include <boost/exception/all.hpp> +DECLARE_LOG_OBJECT() + using namespace pulsar; TEST(BinaryLookupServiceTest, basicLookup) { @@ -56,3 +59,67 @@ TEST(BinaryLookupServiceTest, basicLookup) { ASSERT_TRUE(lookupData != NULL); ASSERT_EQ(url, lookupData->getBrokerUrl()); } + +TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { + std::string url = "pulsar://localhost:8885"; + std::string adminUrl = "http://localhost:8765/"; + Result result; + // 1. create some topics under same namespace + Client client(url); + + std::string topicName1 = "persistent://prop/unit/ns4/basicGetNamespaceTopics1"; + std::string topicName2 = "persistent://prop/unit/ns4/basicGetNamespaceTopics2"; + std::string topicName3 = "persistent://prop/unit/ns4/basicGetNamespaceTopics3"; + // This is not in same namespace. + std::string topicName4 = "persistent://prop/unit/ns2/basicGetNamespaceTopics4"; + + // call admin api to make topics partitioned + std::string url1 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics1/partitions"; + std::string url2 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics2/partitions"; + std::string url3 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics3/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + + // 2. call getTopicsOfNamespaceAsync + ExecutorServiceProviderPtr service = boost::make_shared<ExecutorServiceProvider>(1); + AuthenticationPtr authData = AuthFactory::Disabled(); + ClientConfiguration conf; + ExecutorServiceProviderPtr ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(1)); + ConnectionPool pool_(conf, ioExecutorProvider_, authData, true); + BinaryProtoLookupService lookupService(pool_, url); + + TopicNamePtr topicName = TopicName::get(topicName1); + NamespaceNamePtr nsName = topicName->getNamespaceName(); + + Future<Result, NamespaceTopicsPtr> getTopicsFuture = lookupService.getTopicsOfNamespaceAsync(nsName); + NamespaceTopicsPtr topicsData; + result = getTopicsFuture.get(topicsData); + ASSERT_EQ(ResultOk, result); + ASSERT_TRUE(topicsData != NULL); + + // 3. verify result contains first 3 topic + ASSERT_EQ(topicsData->size(), 3); + ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) != topicsData->end()); + ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) != topicsData->end()); + ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) != topicsData->end()); + + client.shutdown(); +}