This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d0d7e39 [feat] Support pattern subscribe non-persistent topic. (#207)
d0d7e39 is described below
commit d0d7e395e1755d1f19dc177a0cf779ffcdae3108
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Mar 31 14:38:31 2023 +0800
[feat] Support pattern subscribe non-persistent topic. (#207)
* [feat] Support pattern subscribe non-persistent topic.
* Fix code reviews.
---
include/pulsar/ConsumerConfiguration.h | 14 ++++
include/pulsar/RegexSubscriptionMode.h | 42 ++++++++++++
lib/BinaryProtoLookupService.cc | 11 +--
lib/BinaryProtoLookupService.h | 7 +-
lib/ClientConnection.cc | 6 +-
lib/ClientConnection.h | 4 +-
lib/ClientImpl.cc | 36 ++++++++--
lib/ClientImpl.h | 7 +-
lib/Commands.cc | 4 +-
lib/Commands.h | 3 +-
lib/ConsumerConfiguration.cc | 10 +++
lib/ConsumerConfigurationImpl.h | 2 +
lib/HTTPLookupService.cc | 19 +++++-
lib/HTTPLookupService.h | 3 +-
lib/LookupService.h | 4 +-
lib/MultiTopicsConsumerImpl.h | 1 +
lib/PatternMultiTopicsConsumerImpl.cc | 20 +++---
lib/PatternMultiTopicsConsumerImpl.h | 4 +-
lib/ProtoApiEnums.h | 5 ++
lib/RetryableLookupService.h | 5 +-
lib/TopicName.cc | 12 ++++
lib/TopicName.h | 2 +
test-conf/standalone-ssl.conf | 4 ++
tests/ConsumerTest.cc | 97 ++++++++++++++++++++++++++
tests/LookupServiceTest.cc | 121 ++++++++++++++-------------------
tests/TopicNameTest.cc | 17 +++++
26 files changed, 352 insertions(+), 108 deletions(-)
diff --git a/include/pulsar/ConsumerConfiguration.h
b/include/pulsar/ConsumerConfiguration.h
index 0e6634d..8bd95da 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -27,6 +27,7 @@
#include <pulsar/InitialPosition.h>
#include <pulsar/KeySharedPolicy.h>
#include <pulsar/Message.h>
+#include <pulsar/RegexSubscriptionMode.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/TypedMessage.h>
@@ -383,6 +384,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
int getPatternAutoDiscoveryPeriod() const;
+ /**
+ * Determines which topics this consumer should be subscribed to -
Persistent, Non-Persistent, or
+ * AllTopics. Only used with pattern subscriptions.
+ *
+ * @param regexSubscriptionMode The default value is `PersistentOnly`.
+ */
+ ConsumerConfiguration& setRegexSubscriptionMode(RegexSubscriptionMode
regexSubscriptionMode);
+
+ /**
+ * @return the regex subscription mode for the pattern consumer.
+ */
+ RegexSubscriptionMode getRegexSubscriptionMode() const;
+
/**
* The default value is `InitialPositionLatest`.
*
diff --git a/include/pulsar/RegexSubscriptionMode.h
b/include/pulsar/RegexSubscriptionMode.h
new file mode 100644
index 0000000..2143c1c
--- /dev/null
+++ b/include/pulsar/RegexSubscriptionMode.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_REGEX_SUB_MODE_H
+#define PULSAR_CPP_REGEX_SUB_MODE_H
+
+namespace pulsar {
+enum RegexSubscriptionMode
+{
+ /**
+ * Only subscribe to persistent topics.
+ */
+ PersistentOnly = 0,
+
+ /**
+ * Only subscribe to non-persistent topics.
+ */
+ NonPersistentOnly = 1,
+
+ /**
+ * Subscribe to both persistent and non-persistent topics.
+ */
+ AllTopics = 2
+};
+}
+
+#endif // PULSAR_CPP_REGEX_SUB_MODE_H
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 502f7a8..0b23493 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -151,7 +151,7 @@ uint64_t BinaryProtoLookupService::newRequestId() {
}
Future<Result, NamespaceTopicsPtr>
BinaryProtoLookupService::getTopicsOfNamespaceAsync(
- const NamespaceNamePtr& nsName) {
+ const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result,
NamespaceTopicsPtr>>();
if (!nsName) {
promise->setFailed(ResultInvalidTopicName);
@@ -160,7 +160,7 @@ Future<Result, NamespaceTopicsPtr>
BinaryProtoLookupService::getTopicsOfNamespac
std::string namespaceName = nsName->toString();
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest,
this,
- namespaceName, std::placeholders::_1,
std::placeholders::_2, promise));
+ namespaceName, mode, std::placeholders::_1,
std::placeholders::_2, promise));
return promise->getFuture();
}
@@ -201,7 +201,9 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const
std::string& topicName
});
}
-void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const
std::string& nsName, Result result,
+void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const
std::string& nsName,
+
CommandGetTopicsOfNamespace_Mode mode,
+ Result result,
const
ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
@@ -212,8 +214,7 @@ void
BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << "
nsName: " << nsName);
-
- conn->newGetTopicsOfNamespace(nsName, requestId)
+ conn->newGetTopicsOfNamespace(nsName, mode, requestId)
.addListener(std::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener,
this,
std::placeholders::_1, std::placeholders::_2,
promise));
}
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index cfac023..f8c91e6 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -49,7 +49,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override;
- Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) override;
+ Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+ const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override;
Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr&
topicName) override;
@@ -75,8 +76,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
const ClientConnectionWeakPtr&
clientCnx,
LookupDataResultPromisePtr promise);
- void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result
result,
- const ClientConnectionWeakPtr&
clientCnx,
+ void sendGetTopicsOfNamespaceRequest(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode,
+ Result result, const
ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise);
void sendGetSchemaRequest(const std::string& topiName, Result result,
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 5837c50..81eb36d 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1293,8 +1293,8 @@ Future<Result, GetLastMessageIdResponse>
ClientConnection::newGetLastMessageId(u
return promise.getFuture();
}
-Future<Result, NamespaceTopicsPtr>
ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
-
uint64_t requestId) {
+Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
+ const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t
requestId) {
Lock lock(mutex_);
Promise<Result, NamespaceTopicsPtr> promise;
if (isClosed()) {
@@ -1306,7 +1306,7 @@ Future<Result, NamespaceTopicsPtr>
ClientConnection::newGetTopicsOfNamespace(con
pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId,
promise));
lock.unlock();
- sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
+ sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
return promise.getFuture();
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 95bb1ab..0476316 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -181,7 +181,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t
consumerId, uint64_t requestId);
- Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const
std::string& nsName, uint64_t requestId);
+ Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const
std::string& nsName,
+
CommandGetTopicsOfNamespace_Mode mode,
+ uint64_t
requestId);
Future<Result, boost::optional<SchemaInfo>> newGetSchema(const
std::string& topicName,
uint64_t
requestId);
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 3b9606a..38ac689 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -338,29 +338,53 @@ void ClientImpl::subscribeWithRegexAsync(const
std::string& regexPattern, const
}
}
- NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
+ if (TopicName::containsDomain(regexPattern)) {
+ LOG_WARN("Ignore invalid domain: "
+ << topicNamePtr->getDomain()
+ << ", use the RegexSubscriptionMode parameter to set the
topic type");
+ }
+
+ CommandGetTopicsOfNamespace_Mode mode;
+ auto regexSubscriptionMode = conf.getRegexSubscriptionMode();
+ switch (regexSubscriptionMode) {
+ case PersistentOnly:
+ mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT;
+ break;
+ case NonPersistentOnly:
+ mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT;
+ break;
+ case AllTopics:
+ mode = CommandGetTopicsOfNamespace_Mode_ALL;
+ break;
+ default:
+ LOG_ERROR("RegexSubscriptionMode not valid: " <<
regexSubscriptionMode);
+ callback(ResultInvalidConfiguration, Consumer());
+ return;
+ }
- lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
- std::bind(&ClientImpl::createPatternMultiTopicsConsumer,
shared_from_this(), std::placeholders::_1,
- std::placeholders::_2, regexPattern, subscriptionName, conf,
callback));
+
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
mode)
+ .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer,
shared_from_this(),
+ std::placeholders::_1, std::placeholders::_2,
regexPattern, mode,
+ subscriptionName, conf, callback));
}
void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const
NamespaceTopicsPtr topics,
const std::string&
regexPattern,
+
CommandGetTopicsOfNamespace_Mode mode,
const std::string&
subscriptionName,
const ConsumerConfiguration&
conf,
SubscribeCallback callback) {
if (result == ResultOk) {
ConsumerImplBasePtr consumer;
- PULSAR_REGEX_NAMESPACE::regex pattern(regexPattern);
+ PULSAR_REGEX_NAMESPACE::regex
pattern(TopicName::removeDomain(regexPattern));
NamespaceTopicsPtr matchTopics =
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics,
pattern);
auto interceptors =
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
- consumer =
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(),
regexPattern,
+ consumer =
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(),
regexPattern, mode,
*matchTopics, subscriptionName, conf,
lookupServicePtr_, interceptors);
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index dd7ffdd..a7b36ff 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -28,6 +28,7 @@
#include "Future.h"
#include "LookupDataResult.h"
#include "MemoryLimitController.h"
+#include "ProtoApiEnums.h"
#include "ServiceNameResolver.h"
#include "SynchronizedHashMap.h"
@@ -151,8 +152,10 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void handleClose(Result result, SharedInt remaining, ResultCallback
callback);
void createPatternMultiTopicsConsumer(const Result result, const
NamespaceTopicsPtr topics,
- const std::string& regexPattern,
const std::string& consumerName,
- const ConsumerConfiguration& conf,
SubscribeCallback callback);
+ const std::string& regexPattern,
+ CommandGetTopicsOfNamespace_Mode
mode,
+ const std::string& consumerName,
const ConsumerConfiguration& conf,
+ SubscribeCallback callback);
enum State
{
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 829be57..a260b29 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -609,12 +609,14 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t
consumerId, uint64_t request
return buffer;
}
-SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
uint64_t requestId) {
+SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
+
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
CommandGetTopicsOfNamespace* getTopics =
cmd.mutable_gettopicsofnamespace();
getTopics->set_request_id(requestId);
getTopics->set_namespace_(nsName);
+
getTopics->set_mode(static_cast<proto::CommandGetTopicsOfNamespace_Mode>(mode));
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_gettopicsofnamespace();
diff --git a/lib/Commands.h b/lib/Commands.h
index 0331173..adf4fe0 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -156,7 +156,8 @@ class Commands {
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const
MessageId& messageId);
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId,
uint64_t timestamp);
static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t
requestId);
- static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName,
uint64_t requestId);
+ static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName,
+
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId);
static bool peerSupportsGetLastMessageId(int32_t peerVersion);
static bool peerSupportsActiveConsumerListener(int32_t peerVersion);
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 4764b46..044518a 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -317,4 +317,14 @@ ConsumerConfiguration&
ConsumerConfiguration::setAckReceiptEnabled(bool ackRecei
bool ConsumerConfiguration::isAckReceiptEnabled() const { return
impl_->ackReceiptEnabled; }
+ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode(
+ RegexSubscriptionMode regexSubscriptionMode) {
+ impl_->regexSubscriptionMode = regexSubscriptionMode;
+ return *this;
+}
+
+RegexSubscriptionMode ConsumerConfiguration::getRegexSubscriptionMode() const {
+ return impl_->regexSubscriptionMode;
+}
+
} // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index e84aa0a..20a1dae 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -48,6 +48,8 @@ struct ConsumerConfigurationImpl {
BatchReceivePolicy batchReceivePolicy{};
DeadLetterPolicy deadLetterPolicy;
int patternAutoDiscoveryPeriod{60};
+ RegexSubscriptionMode
regexSubscriptionMode{RegexSubscriptionMode::PersistentOnly};
+
bool replicateSubscriptionStateEnabled{false};
std::map<std::string, std::string> properties;
std::map<std::string, std::string> subscriptionProperties;
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 0ce6839..920592d 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -126,17 +126,30 @@ Future<Result, LookupDataResultPtr>
HTTPLookupService::getPartitionMetadataAsync
}
Future<Result, NamespaceTopicsPtr>
HTTPLookupService::getTopicsOfNamespaceAsync(
- const NamespaceNamePtr &nsName) {
+ const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) {
NamespaceTopicsPromise promise;
std::stringstream completeUrlStream;
+ auto convertRegexSubMode = [](CommandGetTopicsOfNamespace_Mode mode) {
+ switch (mode) {
+ case CommandGetTopicsOfNamespace_Mode_PERSISTENT:
+ return "PERSISTENT";
+ case CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT:
+ return "NON_PERSISTENT";
+ case CommandGetTopicsOfNamespace_Mode_ALL:
+ return "ALL";
+ default:
+ return "PERSISTENT";
+ }
+ };
+
const auto &url = serviceNameResolver_.resolveHost();
if (nsName->isV2()) {
completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' <<
nsName->toString() << '/'
- << "topics";
+ << "topics?mode=" << convertRegexSubMode(mode);
} else {
completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' <<
nsName->toString() << '/'
- << "destinations";
+ << "destinations?mode=" << convertRegexSubMode(mode);
}
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 9135677..8dc195c 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -79,7 +79,8 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr&
topicName) override;
- Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) override;
+ Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+ const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override;
};
} // namespace pulsar
diff --git a/lib/LookupService.h b/lib/LookupService.h
index f1c5de8..84dc37c 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -29,6 +29,7 @@
#include "Future.h"
#include "LookupDataResult.h"
+#include "ProtoApiEnums.h"
namespace pulsar {
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
@@ -72,7 +73,8 @@ class LookupService {
*
* Returns all the topics name for a given namespace.
*/
- virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) = 0;
+ virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+ const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
= 0;
/**
* returns current SchemaInfo {@link SchemaInfo} for a given topic.
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 35e1504..b00b0f2 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -180,6 +180,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
+ FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
};
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc
b/lib/PatternMultiTopicsConsumerImpl.cc
index 02a7703..e100a1c 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -28,13 +28,15 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
- ClientImplPtr client, const std::string pattern, const
std::vector<std::string>& topics,
- const std::string& subscriptionName, const ConsumerConfiguration& conf,
- const LookupServicePtr lookupServicePtr_, const ConsumerInterceptorsPtr
interceptors)
+ ClientImplPtr client, const std::string pattern,
CommandGetTopicsOfNamespace_Mode getTopicsMode,
+ const std::vector<std::string>& topics, const std::string&
subscriptionName,
+ const ConsumerConfiguration& conf, const LookupServicePtr
lookupServicePtr_,
+ const ConsumerInterceptorsPtr interceptors)
: MultiTopicsConsumerImpl(client, topics, subscriptionName,
TopicName::get(pattern), conf,
lookupServicePtr_, interceptors),
patternString_(pattern),
- pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)),
+
pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))),
+ getTopicsMode_(getTopicsMode),
autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()),
autoDiscoveryRunning_(false) {
namespaceName_ = TopicName::get(pattern)->getNamespaceName();
@@ -75,7 +77,7 @@ void
PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
// already get namespace from pattern.
assert(namespaceName_);
- lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_)
+ lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_,
getTopicsMode_)
.addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
this,
std::placeholders::_1, std::placeholders::_2));
}
@@ -193,10 +195,10 @@ void
PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedT
NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(
const std::vector<std::string>& topics, const
PULSAR_REGEX_NAMESPACE::regex& pattern) {
NamespaceTopicsPtr topicsResultPtr =
std::make_shared<std::vector<std::string>>();
-
- for (std::vector<std::string>::const_iterator itr = topics.begin(); itr !=
topics.end(); itr++) {
- if (PULSAR_REGEX_NAMESPACE::regex_match(*itr, pattern)) {
- topicsResultPtr->push_back(*itr);
+ for (const auto& topicStr : topics) {
+ auto topic = TopicName::removeDomain(topicStr);
+ if (PULSAR_REGEX_NAMESPACE::regex_match(topic, pattern)) {
+ topicsResultPtr->push_back(std::move(topicStr));
}
}
return topicsResultPtr;
diff --git a/lib/PatternMultiTopicsConsumerImpl.h
b/lib/PatternMultiTopicsConsumerImpl.h
index 87e301e..f13750a 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -48,6 +48,7 @@ class PatternMultiTopicsConsumerImpl : public
MultiTopicsConsumerImpl {
// when subscribe, client will first get all topics that match given
pattern.
// `topics` contains the topics that match `patternString`.
PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string
patternString,
+ CommandGetTopicsOfNamespace_Mode
getTopicsMode,
const std::vector<std::string>& topics,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
const LookupServicePtr lookupServicePtr_,
@@ -57,7 +58,7 @@ class PatternMultiTopicsConsumerImpl : public
MultiTopicsConsumerImpl {
void autoDiscoveryTimerTask(const boost::system::error_code& err);
- // filter input `topics` with given `pattern`, return matched topics
+ // filter input `topics` with given `pattern`, return matched topics. Do
not match topic domain.
static NamespaceTopicsPtr topicsPatternFilter(const
std::vector<std::string>& topics,
const
PULSAR_REGEX_NAMESPACE::regex& pattern);
@@ -72,6 +73,7 @@ class PatternMultiTopicsConsumerImpl : public
MultiTopicsConsumerImpl {
private:
const std::string patternString_;
const PULSAR_REGEX_NAMESPACE::regex pattern_;
+ const CommandGetTopicsOfNamespace_Mode getTopicsMode_;
typedef std::shared_ptr<boost::asio::deadline_timer> TimerPtr;
TimerPtr autoDiscoveryTimer_;
bool autoDiscoveryRunning_;
diff --git a/lib/ProtoApiEnums.h b/lib/ProtoApiEnums.h
index 1f1a79f..5f1876b 100644
--- a/lib/ProtoApiEnums.h
+++ b/lib/ProtoApiEnums.h
@@ -49,6 +49,11 @@ constexpr int CommandSubscribe_SubType_Shared = 1;
constexpr int CommandSubscribe_SubType_Failover = 2;
constexpr int CommandSubscribe_SubType_Key_Shared = 3;
+using CommandGetTopicsOfNamespace_Mode = int;
+constexpr int CommandGetTopicsOfNamespace_Mode_PERSISTENT = 0;
+constexpr int CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT = 1;
+constexpr int CommandGetTopicsOfNamespace_Mode_ALL = 2;
+
using CommandAck_ValidationError = int;
constexpr CommandAck_ValidationError
CommandAck_ValidationError_UncompressedSizeCorruption = 0;
constexpr CommandAck_ValidationError
CommandAck_ValidationError_DecompressionError = 1;
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 4fc9a73..01dd82b 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -60,10 +60,11 @@ class RetryableLookupService : public LookupService,
[this, topicName] { return
lookupService_->getPartitionMetadataAsync(topicName); });
}
- Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) override {
+ Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+ const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override {
return executeAsync<NamespaceTopicsPtr>(
"get-topics-of-namespace-" + nsName->toString(),
- [this, nsName] { return
lookupService_->getTopicsOfNamespaceAsync(nsName); });
+ [this, nsName, mode] { return
lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
}
Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr&
topicName) override {
diff --git a/lib/TopicName.cc b/lib/TopicName.cc
index 48c52c4..5b892fc 100644
--- a/lib/TopicName.cc
+++ b/lib/TopicName.cc
@@ -256,4 +256,16 @@ int TopicName::getPartitionIndex(const std::string& topic)
{
NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }
+std::string TopicName::removeDomain(const std::string& topicName) {
+ auto index = topicName.find("://");
+ if (index != std::string::npos) {
+ return topicName.substr(index + 3, topicName.length());
+ }
+ return topicName;
+}
+
+bool TopicName::containsDomain(const std::string& topicName) {
+ return topicName.find("://") != std::string::npos;
+}
+
} // namespace pulsar
diff --git a/lib/TopicName.h b/lib/TopicName.h
index 51f701f..8cc9cb5 100644
--- a/lib/TopicName.h
+++ b/lib/TopicName.h
@@ -67,6 +67,8 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId {
static std::shared_ptr<TopicName> get(const std::string& topicName);
bool operator==(const TopicName& other);
static std::string getEncodedName(const std::string& nameBeforeEncoding);
+ static std::string removeDomain(const std::string& topicName);
+ static bool containsDomain(const std::string& topicName);
std::string getTopicPartitionName(unsigned int partition) const;
static int getPartitionIndex(const std::string& topic);
diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf
index 1e54360..4b15007 100644
--- a/test-conf/standalone-ssl.conf
+++ b/test-conf/standalone-ssl.conf
@@ -19,6 +19,10 @@
### --- General broker settings --- ###
+# Disable system topic
+systemTopicEnabled=false
+topicLevelPoliciesEnabled=false
+
# Zookeeper quorum connection string
zookeeperServers=
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 0ecf82c..23ea687 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1025,6 +1025,103 @@ TEST(ConsumerTest,
testRedeliveryOfDecryptionFailedMessages) {
ASSERT_EQ(ResultOk, client.close());
}
+TEST(ConsumerTest, testPatternSubscribeTopic) {
+ Client client(lookupUrl);
+ auto topicName = "testPatternSubscribeTopic" +
std::to_string(time(nullptr));
+ std::string topicName1 = "persistent://public/default/" + topicName + "1";
+ std::string topicName2 = "persistent://public/default/" + topicName + "2";
+ std::string topicName3 = "non-persistent://public/default/" + topicName +
"3np";
+ // This will not match pattern
+ std::string topicName4 = "persistent://public/default/noMatch" + topicName;
+
+ // 0. trigger create topic
+ Producer producer1;
+ Result result = client.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer4;
+ result = client.createProducer(topicName4, producer4);
+ ASSERT_EQ(ResultOk, result);
+
+ // verify sub persistent and non-persistent topic
+ {
+ // 1. Use pattern to sub topic1, topic2, topic3
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+ consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::AllTopics);
+ Consumer consumer;
+ std::string pattern = "public/default/" + topicName + ".*";
+ ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, "sub-all",
consConfig, consumer));
+ auto multiConsumerImplPtr =
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+ ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 3);
+ ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName1));
+ ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName2));
+ ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName3));
+ ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4));
+
+ // 2. send msg to topic1, topic2, topic3, topic4
+ int messageNumber = 10;
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ auto content = "msg-content" + std::to_string(msgNum);
+ ASSERT_EQ(ResultOk,
producer1.send(MessageBuilder().setContent(content).build()));
+ ASSERT_EQ(ResultOk,
producer2.send(MessageBuilder().setContent(content).build()));
+ ASSERT_EQ(ResultOk,
producer3.send(MessageBuilder().setContent(content).build()));
+ ASSERT_EQ(ResultOk,
producer4.send(MessageBuilder().setContent(content).build()));
+ }
+
+ // 3. receive msg from topic1, topic2, topic3
+ Message m;
+ for (int i = 0; i < 3 * messageNumber; i++) {
+ ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+ }
+ // verify no more to receive, because producer4 not match pattern
+ ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ }
+
+ // verify only sub persistent topic
+ {
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+
consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::PersistentOnly);
+ Consumer consumer;
+ std::string pattern = "public/default/" + topicName + ".*";
+ ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern,
"sub-persistent", consConfig, consumer));
+ auto multiConsumerImplPtr =
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+ ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 2);
+ ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName1));
+ ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName2));
+ ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName3));
+ ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4));
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ }
+
+ // verify only sub non-persistent topic
+ {
+ ConsumerConfiguration consConfig;
+ consConfig.setConsumerType(ConsumerShared);
+
consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::NonPersistentOnly);
+ Consumer consumer;
+ std::string pattern = "public/default/" + topicName + ".*";
+ ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern,
"sub-non-persistent", consConfig, consumer));
+ auto multiConsumerImplPtr =
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+ ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 1);
+ ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName1));
+ ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName2));
+ ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName3));
+ ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4));
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ }
+
+ client.close();
+}
+
class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
public:
void SetUp() override { producerConf_ =
ProducerConfiguration().setBatchingEnabled(GetParam()); }
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 9467727..61311f4 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -70,71 +70,6 @@ TEST(LookupServiceTest, basicLookup) {
ASSERT_EQ(url, lookupResult.physicalAddress);
}
-TEST(LookupServiceTest, basicGetNamespaceTopics) {
- std::string url = "pulsar://localhost:6650";
- std::string adminUrl = "http://localhost:8080/";
- Result result;
- // 1. create some topics under same namespace
- Client client(url);
-
- std::string topicName1 =
"persistent://public/default/basicGetNamespaceTopics1";
- std::string topicName2 =
"persistent://public/default/basicGetNamespaceTopics2";
- std::string topicName3 =
"persistent://public/default/basicGetNamespaceTopics3";
- // This is not in same namespace.
- std::string topicName4 =
"persistent://public/default-2/basicGetNamespaceTopics4";
-
- // call admin api to make topics partitioned
- std::string url1 = adminUrl +
"admin/v2/persistent/public/default/basicGetNamespaceTopics1/partitions";
- std::string url2 = adminUrl +
"admin/v2/persistent/public/default/basicGetNamespaceTopics2/partitions";
- std::string url3 = adminUrl +
"admin/v2/persistent/public/default/basicGetNamespaceTopics3/partitions";
-
- int res = makePutRequest(url1, "2");
- ASSERT_FALSE(res != 204 && res != 409);
- res = makePutRequest(url2, "3");
- ASSERT_FALSE(res != 204 && res != 409);
- res = makePutRequest(url3, "4");
- ASSERT_FALSE(res != 204 && res != 409);
-
- Producer producer1;
- result = client.createProducer(topicName1, producer1);
- ASSERT_EQ(ResultOk, result);
- Producer producer2;
- result = client.createProducer(topicName2, producer2);
- ASSERT_EQ(ResultOk, result);
- Producer producer3;
- result = client.createProducer(topicName3, producer3);
- ASSERT_EQ(ResultOk, result);
- Producer producer4;
- result = client.createProducer(topicName4, producer4);
- ASSERT_EQ(ResultOk, result);
-
- // 2. call getTopicsOfNamespaceAsync
- ExecutorServiceProviderPtr service =
std::make_shared<ExecutorServiceProvider>(1);
- AuthenticationPtr authData = AuthFactory::Disabled();
- ClientConfiguration conf;
- ExecutorServiceProviderPtr
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
- ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
- ServiceNameResolver serviceNameResolver(url);
- BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
-
- TopicNamePtr topicName = TopicName::get(topicName1);
- NamespaceNamePtr nsName = topicName->getNamespaceName();
-
- Future<Result, NamespaceTopicsPtr> getTopicsFuture =
lookupService.getTopicsOfNamespaceAsync(nsName);
- NamespaceTopicsPtr topicsData;
- result = getTopicsFuture.get(topicsData);
- ASSERT_EQ(ResultOk, result);
- ASSERT_TRUE(topicsData != NULL);
-
- // 3. verify result contains first 3 topic
- ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1)
!= topicsData->end());
- ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2)
!= topicsData->end());
- ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3)
!= topicsData->end());
- ASSERT_FALSE(std::find(topicsData->begin(), topicsData->end(), topicName4)
!= topicsData->end());
-
- client.shutdown();
-}
-
static void testMultiAddresses(LookupService& lookupService) {
std::vector<Result> results;
constexpr int numRequests = 6;
@@ -166,8 +101,10 @@ static void testMultiAddresses(LookupService&
lookupService) {
results.clear();
for (int i = 0; i < numRequests; i++) {
NamespaceTopicsPtr data;
- const auto result =
-
lookupService.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName()).get(data);
+ const auto result = lookupService
+
.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName(),
+
CommandGetTopicsOfNamespace_Mode_PERSISTENT)
+ .get(data);
LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result);
results.emplace_back(result);
}
@@ -212,7 +149,8 @@ TEST(LookupServiceTest, testRetry) {
LOG_INFO("getPartitionMetadataAsync returns " <<
lookupDataResultPtr->getPartitions() << " partitions");
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
- auto future3 =
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+ auto future3 =
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
+
CommandGetTopicsOfNamespace_Mode_PERSISTENT);
NamespaceTopicsPtr namespaceTopicsPtr;
ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
LOG_INFO("getTopicPartitionName Async returns " <<
namespaceTopicsPtr->size() << " topics");
@@ -273,7 +211,8 @@ TEST(LookupServiceTest, testTimeout) {
afterMethod("getPartitionMetadataAsync");
beforeMethod();
- auto future3 =
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+ auto future3 =
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
+
CommandGetTopicsOfNamespace_Mode_PERSISTENT);
NamespaceTopicsPtr namespaceTopicsPtr;
ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
afterMethod("getTopicsOfNamespaceAsync");
@@ -289,6 +228,50 @@ class LookupServiceTest : public
::testing::TestWithParam<std::string> {
Client client_{GetParam()};
};
+TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
+ Result result;
+
+ auto nsName = NamespaceName::get("public", GetParam().substr(0, 4) +
std::to_string(time(nullptr)));
+ std::string topicName1 = "persistent://" + nsName->toString() +
"/basicGetNamespaceTopics1";
+ std::string topicName2 = "persistent://" + nsName->toString() +
"/basicGetNamespaceTopics2";
+ std::string topicName3 = "non-persistent://" + nsName->toString() +
"/basicGetNamespaceTopics3";
+
+ // 0. create a namespace
+ auto createNsUrl = httpLookupUrl + "/admin/v2/namespaces/" +
nsName->toString();
+ auto res = makePutRequest(createNsUrl, "");
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ // 1. trigger auto create topic
+ Producer producer1;
+ result = client_.createProducer(topicName1, producer1);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer2;
+ result = client_.createProducer(topicName2, producer2);
+ ASSERT_EQ(ResultOk, result);
+ Producer producer3;
+ result = client_.createProducer(topicName3, producer3);
+ ASSERT_EQ(ResultOk, result);
+
+ // 2. verify getTopicsOfNamespace by regex mode.
+ auto lookupServicePtr =
PulsarFriend::getClientImplPtr(client_)->getLookup();
+ auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode,
+ const std::set<std::string>& expectedTopics) {
+ Future<Result, NamespaceTopicsPtr> getTopicsFuture =
+ lookupServicePtr->getTopicsOfNamespaceAsync(nsName, mode);
+ NamespaceTopicsPtr topicsData;
+ result = getTopicsFuture.get(topicsData);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_TRUE(topicsData != NULL);
+ std::set<std::string> actualTopics(topicsData->begin(),
topicsData->end());
+ ASSERT_EQ(expectedTopics, actualTopics);
+ };
+ verifyGetTopics(CommandGetTopicsOfNamespace_Mode_PERSISTENT, {topicName1,
topicName2});
+ verifyGetTopics(CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT,
{topicName3});
+ verifyGetTopics(CommandGetTopicsOfNamespace_Mode_ALL, {topicName1,
topicName2, topicName3});
+
+ client_.close();
+}
+
TEST_P(LookupServiceTest, testGetSchema) {
const std::string topic = "testGetSchema" + std::to_string(time(nullptr))
+ GetParam().substr(0, 4);
std::string jsonSchema =
diff --git a/tests/TopicNameTest.cc b/tests/TopicNameTest.cc
index 41838f5..74dc605 100644
--- a/tests/TopicNameTest.cc
+++ b/tests/TopicNameTest.cc
@@ -191,3 +191,20 @@ TEST(TopicNameTest, testPartitionIndex) {
ASSERT_EQ(topicName->getPartitionIndex(), partition);
}
}
+
+TEST(TopicNameTest, testRemoveDomain) {
+ auto topicName1 = "persistent://public/default/test-topic";
+ ASSERT_EQ("public/default/test-topic",
TopicName::removeDomain(topicName1));
+
+ auto topicName2 = "non-persistent://public/default/test-topic";
+ ASSERT_EQ("public/default/test-topic",
TopicName::removeDomain(topicName2));
+
+ auto topicName3 = "public/default/test-topic";
+ ASSERT_EQ(topicName3, TopicName::removeDomain(topicName2));
+}
+
+TEST(TopicNameTest, testContainsDomain) {
+
ASSERT_TRUE(TopicName::containsDomain("persistent://public/default/test-topic"));
+
ASSERT_TRUE(TopicName::containsDomain("non-persistent://public/default/test-topic"));
+ ASSERT_FALSE(TopicName::containsDomain("public/default/test-topic"));
+}