This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d0d7e39  [feat] Support pattern subscribe non-persistent topic. (#207)
d0d7e39 is described below

commit d0d7e395e1755d1f19dc177a0cf779ffcdae3108
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Mar 31 14:38:31 2023 +0800

    [feat] Support pattern subscribe non-persistent topic. (#207)
    
    * [feat] Support pattern subscribe non-persistent topic.
    
    * Fix code reviews.
---
 include/pulsar/ConsumerConfiguration.h |  14 ++++
 include/pulsar/RegexSubscriptionMode.h |  42 ++++++++++++
 lib/BinaryProtoLookupService.cc        |  11 +--
 lib/BinaryProtoLookupService.h         |   7 +-
 lib/ClientConnection.cc                |   6 +-
 lib/ClientConnection.h                 |   4 +-
 lib/ClientImpl.cc                      |  36 ++++++++--
 lib/ClientImpl.h                       |   7 +-
 lib/Commands.cc                        |   4 +-
 lib/Commands.h                         |   3 +-
 lib/ConsumerConfiguration.cc           |  10 +++
 lib/ConsumerConfigurationImpl.h        |   2 +
 lib/HTTPLookupService.cc               |  19 +++++-
 lib/HTTPLookupService.h                |   3 +-
 lib/LookupService.h                    |   4 +-
 lib/MultiTopicsConsumerImpl.h          |   1 +
 lib/PatternMultiTopicsConsumerImpl.cc  |  20 +++---
 lib/PatternMultiTopicsConsumerImpl.h   |   4 +-
 lib/ProtoApiEnums.h                    |   5 ++
 lib/RetryableLookupService.h           |   5 +-
 lib/TopicName.cc                       |  12 ++++
 lib/TopicName.h                        |   2 +
 test-conf/standalone-ssl.conf          |   4 ++
 tests/ConsumerTest.cc                  |  97 ++++++++++++++++++++++++++
 tests/LookupServiceTest.cc             | 121 ++++++++++++++-------------------
 tests/TopicNameTest.cc                 |  17 +++++
 26 files changed, 352 insertions(+), 108 deletions(-)

diff --git a/include/pulsar/ConsumerConfiguration.h 
b/include/pulsar/ConsumerConfiguration.h
index 0e6634d..8bd95da 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -27,6 +27,7 @@
 #include <pulsar/InitialPosition.h>
 #include <pulsar/KeySharedPolicy.h>
 #include <pulsar/Message.h>
+#include <pulsar/RegexSubscriptionMode.h>
 #include <pulsar/Result.h>
 #include <pulsar/Schema.h>
 #include <pulsar/TypedMessage.h>
@@ -383,6 +384,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     int getPatternAutoDiscoveryPeriod() const;
 
+    /**
+     * Determines which topics this consumer should be subscribed to - 
Persistent, Non-Persistent, or
+     * AllTopics. Only used with pattern subscriptions.
+     *
+     * @param regexSubscriptionMode The default value is `PersistentOnly`.
+     */
+    ConsumerConfiguration& setRegexSubscriptionMode(RegexSubscriptionMode 
regexSubscriptionMode);
+
+    /**
+     * @return the regex subscription mode for the pattern consumer.
+     */
+    RegexSubscriptionMode getRegexSubscriptionMode() const;
+
     /**
      * The default value is `InitialPositionLatest`.
      *
diff --git a/include/pulsar/RegexSubscriptionMode.h 
b/include/pulsar/RegexSubscriptionMode.h
new file mode 100644
index 0000000..2143c1c
--- /dev/null
+++ b/include/pulsar/RegexSubscriptionMode.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_REGEX_SUB_MODE_H
+#define PULSAR_CPP_REGEX_SUB_MODE_H
+
+namespace pulsar {
+enum RegexSubscriptionMode
+{
+    /**
+     * Only subscribe to persistent topics.
+     */
+    PersistentOnly = 0,
+
+    /**
+     * Only subscribe to non-persistent topics.
+     */
+    NonPersistentOnly = 1,
+
+    /**
+     * Subscribe to both persistent and non-persistent topics.
+     */
+    AllTopics = 2
+};
+}
+
+#endif  // PULSAR_CPP_REGEX_SUB_MODE_H
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 502f7a8..0b23493 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -151,7 +151,7 @@ uint64_t BinaryProtoLookupService::newRequestId() {
 }
 
 Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespaceAsync(
-    const NamespaceNamePtr& nsName) {
+    const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
     NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result, 
NamespaceTopicsPtr>>();
     if (!nsName) {
         promise->setFailed(ResultInvalidTopicName);
@@ -160,7 +160,7 @@ Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespac
     std::string namespaceName = nsName->toString();
     cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
         
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest,
 this,
-                               namespaceName, std::placeholders::_1, 
std::placeholders::_2, promise));
+                               namespaceName, mode, std::placeholders::_1, 
std::placeholders::_2, promise));
     return promise->getFuture();
 }
 
@@ -201,7 +201,9 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const 
std::string& topicName
         });
 }
 
-void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const 
std::string& nsName, Result result,
+void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const 
std::string& nsName,
+                                                               
CommandGetTopicsOfNamespace_Mode mode,
+                                                               Result result,
                                                                const 
ClientConnectionWeakPtr& clientCnx,
                                                                
NamespaceTopicsPromisePtr promise) {
     if (result != ResultOk) {
@@ -212,8 +214,7 @@ void 
BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
     ClientConnectionPtr conn = clientCnx.lock();
     uint64_t requestId = newRequestId();
     LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " 
nsName: " << nsName);
-
-    conn->newGetTopicsOfNamespace(nsName, requestId)
+    conn->newGetTopicsOfNamespace(nsName, mode, requestId)
         
.addListener(std::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, 
this,
                                std::placeholders::_1, std::placeholders::_2, 
promise));
 }
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index cfac023..f8c91e6 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -49,7 +49,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override;
 
-    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) override;
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+        const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override;
 
     Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override;
 
@@ -75,8 +76,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
                                        const ClientConnectionWeakPtr& 
clientCnx,
                                        LookupDataResultPromisePtr promise);
 
-    void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result 
result,
-                                         const ClientConnectionWeakPtr& 
clientCnx,
+    void sendGetTopicsOfNamespaceRequest(const std::string& nsName, 
CommandGetTopicsOfNamespace_Mode mode,
+                                         Result result, const 
ClientConnectionWeakPtr& clientCnx,
                                          NamespaceTopicsPromisePtr promise);
 
     void sendGetSchemaRequest(const std::string& topiName, Result result,
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 5837c50..81eb36d 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1293,8 +1293,8 @@ Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(u
     return promise.getFuture();
 }
 
-Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
-                                                                             
uint64_t requestId) {
+Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
+    const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t 
requestId) {
     Lock lock(mutex_);
     Promise<Result, NamespaceTopicsPtr> promise;
     if (isClosed()) {
@@ -1306,7 +1306,7 @@ Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(con
 
     pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, 
promise));
     lock.unlock();
-    sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
+    sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
     return promise.getFuture();
 }
 
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 95bb1ab..0476316 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -181,7 +181,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t 
consumerId, uint64_t requestId);
 
-    Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const 
std::string& nsName, uint64_t requestId);
+    Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const 
std::string& nsName,
+                                                               
CommandGetTopicsOfNamespace_Mode mode,
+                                                               uint64_t 
requestId);
 
     Future<Result, boost::optional<SchemaInfo>> newGetSchema(const 
std::string& topicName,
                                                              uint64_t 
requestId);
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 3b9606a..38ac689 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -338,29 +338,53 @@ void ClientImpl::subscribeWithRegexAsync(const 
std::string& regexPattern, const
         }
     }
 
-    NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
+    if (TopicName::containsDomain(regexPattern)) {
+        LOG_WARN("Ignore invalid domain: "
+                 << topicNamePtr->getDomain()
+                 << ", use the RegexSubscriptionMode parameter to set the 
topic type");
+    }
+
+    CommandGetTopicsOfNamespace_Mode mode;
+    auto regexSubscriptionMode = conf.getRegexSubscriptionMode();
+    switch (regexSubscriptionMode) {
+        case PersistentOnly:
+            mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT;
+            break;
+        case NonPersistentOnly:
+            mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT;
+            break;
+        case AllTopics:
+            mode = CommandGetTopicsOfNamespace_Mode_ALL;
+            break;
+        default:
+            LOG_ERROR("RegexSubscriptionMode not valid: " << 
regexSubscriptionMode);
+            callback(ResultInvalidConfiguration, Consumer());
+            return;
+    }
 
-    lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
-        std::bind(&ClientImpl::createPatternMultiTopicsConsumer, 
shared_from_this(), std::placeholders::_1,
-                  std::placeholders::_2, regexPattern, subscriptionName, conf, 
callback));
+    
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), 
mode)
+        .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, 
shared_from_this(),
+                               std::placeholders::_1, std::placeholders::_2, 
regexPattern, mode,
+                               subscriptionName, conf, callback));
 }
 
 void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const 
NamespaceTopicsPtr topics,
                                                   const std::string& 
regexPattern,
+                                                  
CommandGetTopicsOfNamespace_Mode mode,
                                                   const std::string& 
subscriptionName,
                                                   const ConsumerConfiguration& 
conf,
                                                   SubscribeCallback callback) {
     if (result == ResultOk) {
         ConsumerImplBasePtr consumer;
 
-        PULSAR_REGEX_NAMESPACE::regex pattern(regexPattern);
+        PULSAR_REGEX_NAMESPACE::regex 
pattern(TopicName::removeDomain(regexPattern));
 
         NamespaceTopicsPtr matchTopics =
             PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, 
pattern);
 
         auto interceptors = 
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
 
-        consumer = 
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), 
regexPattern,
+        consumer = 
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), 
regexPattern, mode,
                                                                     
*matchTopics, subscriptionName, conf,
                                                                     
lookupServicePtr_, interceptors);
 
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index dd7ffdd..a7b36ff 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -28,6 +28,7 @@
 #include "Future.h"
 #include "LookupDataResult.h"
 #include "MemoryLimitController.h"
+#include "ProtoApiEnums.h"
 #include "ServiceNameResolver.h"
 #include "SynchronizedHashMap.h"
 
@@ -151,8 +152,10 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     void handleClose(Result result, SharedInt remaining, ResultCallback 
callback);
 
     void createPatternMultiTopicsConsumer(const Result result, const 
NamespaceTopicsPtr topics,
-                                          const std::string& regexPattern, 
const std::string& consumerName,
-                                          const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+                                          const std::string& regexPattern,
+                                          CommandGetTopicsOfNamespace_Mode 
mode,
+                                          const std::string& consumerName, 
const ConsumerConfiguration& conf,
+                                          SubscribeCallback callback);
 
     enum State
     {
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 829be57..a260b29 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -609,12 +609,14 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t 
consumerId, uint64_t request
     return buffer;
 }
 
-SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, 
uint64_t requestId) {
+SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
+                                               
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
     CommandGetTopicsOfNamespace* getTopics = 
cmd.mutable_gettopicsofnamespace();
     getTopics->set_request_id(requestId);
     getTopics->set_namespace_(nsName);
+    
getTopics->set_mode(static_cast<proto::CommandGetTopicsOfNamespace_Mode>(mode));
 
     const SharedBuffer buffer = writeMessageWithSize(cmd);
     cmd.clear_gettopicsofnamespace();
diff --git a/lib/Commands.h b/lib/Commands.h
index 0331173..adf4fe0 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -156,7 +156,8 @@ class Commands {
     static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const 
MessageId& messageId);
     static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, 
uint64_t timestamp);
     static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t 
requestId);
-    static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, 
uint64_t requestId);
+    static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName,
+                                                
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId);
 
     static bool peerSupportsGetLastMessageId(int32_t peerVersion);
     static bool peerSupportsActiveConsumerListener(int32_t peerVersion);
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 4764b46..044518a 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -317,4 +317,14 @@ ConsumerConfiguration& 
ConsumerConfiguration::setAckReceiptEnabled(bool ackRecei
 
 bool ConsumerConfiguration::isAckReceiptEnabled() const { return 
impl_->ackReceiptEnabled; }
 
+ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode(
+    RegexSubscriptionMode regexSubscriptionMode) {
+    impl_->regexSubscriptionMode = regexSubscriptionMode;
+    return *this;
+}
+
+RegexSubscriptionMode ConsumerConfiguration::getRegexSubscriptionMode() const {
+    return impl_->regexSubscriptionMode;
+}
+
 }  // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index e84aa0a..20a1dae 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -48,6 +48,8 @@ struct ConsumerConfigurationImpl {
     BatchReceivePolicy batchReceivePolicy{};
     DeadLetterPolicy deadLetterPolicy;
     int patternAutoDiscoveryPeriod{60};
+    RegexSubscriptionMode 
regexSubscriptionMode{RegexSubscriptionMode::PersistentOnly};
+
     bool replicateSubscriptionStateEnabled{false};
     std::map<std::string, std::string> properties;
     std::map<std::string, std::string> subscriptionProperties;
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 0ce6839..920592d 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -126,17 +126,30 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync
 }
 
 Future<Result, NamespaceTopicsPtr> 
HTTPLookupService::getTopicsOfNamespaceAsync(
-    const NamespaceNamePtr &nsName) {
+    const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) {
     NamespaceTopicsPromise promise;
     std::stringstream completeUrlStream;
 
+    auto convertRegexSubMode = [](CommandGetTopicsOfNamespace_Mode mode) {
+        switch (mode) {
+            case CommandGetTopicsOfNamespace_Mode_PERSISTENT:
+                return "PERSISTENT";
+            case CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT:
+                return "NON_PERSISTENT";
+            case CommandGetTopicsOfNamespace_Mode_ALL:
+                return "ALL";
+            default:
+                return "PERSISTENT";
+        }
+    };
+
     const auto &url = serviceNameResolver_.resolveHost();
     if (nsName->isV2()) {
         completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << 
nsName->toString() << '/'
-                          << "topics";
+                          << "topics?mode=" << convertRegexSubMode(mode);
     } else {
         completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << 
nsName->toString() << '/'
-                          << "destinations";
+                          << "destinations?mode=" << convertRegexSubMode(mode);
     }
 
     
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 9135677..8dc195c 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -79,7 +79,8 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
 
     Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override;
 
-    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) override;
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+        const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override;
 };
 }  // namespace pulsar
 
diff --git a/lib/LookupService.h b/lib/LookupService.h
index f1c5de8..84dc37c 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -29,6 +29,7 @@
 
 #include "Future.h"
 #include "LookupDataResult.h"
+#include "ProtoApiEnums.h"
 
 namespace pulsar {
 using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
@@ -72,7 +73,8 @@ class LookupService {
      *
      * Returns all the topics name for a given namespace.
      */
-    virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) = 0;
+    virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+        const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
= 0;
 
     /**
      * returns current SchemaInfo {@link SchemaInfo} for a given topic.
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 35e1504..b00b0f2 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -180,6 +180,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
+    FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
 };
 
 typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc 
b/lib/PatternMultiTopicsConsumerImpl.cc
index 02a7703..e100a1c 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -28,13 +28,15 @@ DECLARE_LOG_OBJECT()
 using namespace pulsar;
 
 PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
-    ClientImplPtr client, const std::string pattern, const 
std::vector<std::string>& topics,
-    const std::string& subscriptionName, const ConsumerConfiguration& conf,
-    const LookupServicePtr lookupServicePtr_, const ConsumerInterceptorsPtr 
interceptors)
+    ClientImplPtr client, const std::string pattern, 
CommandGetTopicsOfNamespace_Mode getTopicsMode,
+    const std::vector<std::string>& topics, const std::string& 
subscriptionName,
+    const ConsumerConfiguration& conf, const LookupServicePtr 
lookupServicePtr_,
+    const ConsumerInterceptorsPtr interceptors)
     : MultiTopicsConsumerImpl(client, topics, subscriptionName, 
TopicName::get(pattern), conf,
                               lookupServicePtr_, interceptors),
       patternString_(pattern),
-      pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)),
+      
pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))),
+      getTopicsMode_(getTopicsMode),
       
autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()),
       autoDiscoveryRunning_(false) {
     namespaceName_ = TopicName::get(pattern)->getNamespaceName();
@@ -75,7 +77,7 @@ void 
PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
     // already get namespace from pattern.
     assert(namespaceName_);
 
-    lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_)
+    lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, 
getTopicsMode_)
         
.addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
 this,
                                std::placeholders::_1, std::placeholders::_2));
 }
@@ -193,10 +195,10 @@ void 
PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedT
 NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(
     const std::vector<std::string>& topics, const 
PULSAR_REGEX_NAMESPACE::regex& pattern) {
     NamespaceTopicsPtr topicsResultPtr = 
std::make_shared<std::vector<std::string>>();
-
-    for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != 
topics.end(); itr++) {
-        if (PULSAR_REGEX_NAMESPACE::regex_match(*itr, pattern)) {
-            topicsResultPtr->push_back(*itr);
+    for (const auto& topicStr : topics) {
+        auto topic = TopicName::removeDomain(topicStr);
+        if (PULSAR_REGEX_NAMESPACE::regex_match(topic, pattern)) {
+            topicsResultPtr->push_back(std::move(topicStr));
         }
     }
     return topicsResultPtr;
diff --git a/lib/PatternMultiTopicsConsumerImpl.h 
b/lib/PatternMultiTopicsConsumerImpl.h
index 87e301e..f13750a 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -48,6 +48,7 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
     // when subscribe, client will first get all topics that match given 
pattern.
     // `topics` contains the topics that match `patternString`.
     PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string 
patternString,
+                                   CommandGetTopicsOfNamespace_Mode 
getTopicsMode,
                                    const std::vector<std::string>& topics,
                                    const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
                                    const LookupServicePtr lookupServicePtr_,
@@ -57,7 +58,7 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
 
     void autoDiscoveryTimerTask(const boost::system::error_code& err);
 
-    // filter input `topics` with given `pattern`, return matched topics
+    // filter input `topics` with given `pattern`, return matched topics. Do 
not match topic domain.
     static NamespaceTopicsPtr topicsPatternFilter(const 
std::vector<std::string>& topics,
                                                   const 
PULSAR_REGEX_NAMESPACE::regex& pattern);
 
@@ -72,6 +73,7 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
    private:
     const std::string patternString_;
     const PULSAR_REGEX_NAMESPACE::regex pattern_;
+    const CommandGetTopicsOfNamespace_Mode getTopicsMode_;
     typedef std::shared_ptr<boost::asio::deadline_timer> TimerPtr;
     TimerPtr autoDiscoveryTimer_;
     bool autoDiscoveryRunning_;
diff --git a/lib/ProtoApiEnums.h b/lib/ProtoApiEnums.h
index 1f1a79f..5f1876b 100644
--- a/lib/ProtoApiEnums.h
+++ b/lib/ProtoApiEnums.h
@@ -49,6 +49,11 @@ constexpr int CommandSubscribe_SubType_Shared = 1;
 constexpr int CommandSubscribe_SubType_Failover = 2;
 constexpr int CommandSubscribe_SubType_Key_Shared = 3;
 
+using CommandGetTopicsOfNamespace_Mode = int;
+constexpr int CommandGetTopicsOfNamespace_Mode_PERSISTENT = 0;
+constexpr int CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT = 1;
+constexpr int CommandGetTopicsOfNamespace_Mode_ALL = 2;
+
 using CommandAck_ValidationError = int;
 constexpr CommandAck_ValidationError 
CommandAck_ValidationError_UncompressedSizeCorruption = 0;
 constexpr CommandAck_ValidationError 
CommandAck_ValidationError_DecompressionError = 1;
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 4fc9a73..01dd82b 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -60,10 +60,11 @@ class RetryableLookupService : public LookupService,
             [this, topicName] { return 
lookupService_->getPartitionMetadataAsync(topicName); });
     }
 
-    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) override {
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
+        const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override {
         return executeAsync<NamespaceTopicsPtr>(
             "get-topics-of-namespace-" + nsName->toString(),
-            [this, nsName] { return 
lookupService_->getTopicsOfNamespaceAsync(nsName); });
+            [this, nsName, mode] { return 
lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
     }
 
     Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override {
diff --git a/lib/TopicName.cc b/lib/TopicName.cc
index 48c52c4..5b892fc 100644
--- a/lib/TopicName.cc
+++ b/lib/TopicName.cc
@@ -256,4 +256,16 @@ int TopicName::getPartitionIndex(const std::string& topic) 
{
 
 NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }
 
+std::string TopicName::removeDomain(const std::string& topicName) {
+    auto index = topicName.find("://");
+    if (index != std::string::npos) {
+        return topicName.substr(index + 3, topicName.length());
+    }
+    return topicName;
+}
+
+bool TopicName::containsDomain(const std::string& topicName) {
+    return topicName.find("://") != std::string::npos;
+}
+
 }  // namespace pulsar
diff --git a/lib/TopicName.h b/lib/TopicName.h
index 51f701f..8cc9cb5 100644
--- a/lib/TopicName.h
+++ b/lib/TopicName.h
@@ -67,6 +67,8 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId {
     static std::shared_ptr<TopicName> get(const std::string& topicName);
     bool operator==(const TopicName& other);
     static std::string getEncodedName(const std::string& nameBeforeEncoding);
+    static std::string removeDomain(const std::string& topicName);
+    static bool containsDomain(const std::string& topicName);
     std::string getTopicPartitionName(unsigned int partition) const;
     static int getPartitionIndex(const std::string& topic);
 
diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf
index 1e54360..4b15007 100644
--- a/test-conf/standalone-ssl.conf
+++ b/test-conf/standalone-ssl.conf
@@ -19,6 +19,10 @@
 
 ### --- General broker settings --- ###
 
+# Disable system topic
+systemTopicEnabled=false
+topicLevelPoliciesEnabled=false
+
 # Zookeeper quorum connection string
 zookeeperServers=
 
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 0ecf82c..23ea687 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1025,6 +1025,103 @@ TEST(ConsumerTest, 
testRedeliveryOfDecryptionFailedMessages) {
     ASSERT_EQ(ResultOk, client.close());
 }
 
+TEST(ConsumerTest, testPatternSubscribeTopic) {
+    Client client(lookupUrl);
+    auto topicName = "testPatternSubscribeTopic" + 
std::to_string(time(nullptr));
+    std::string topicName1 = "persistent://public/default/" + topicName + "1";
+    std::string topicName2 = "persistent://public/default/" + topicName + "2";
+    std::string topicName3 = "non-persistent://public/default/" + topicName + 
"3np";
+    // This will not match pattern
+    std::string topicName4 = "persistent://public/default/noMatch" + topicName;
+
+    // 0. trigger create topic
+    Producer producer1;
+    Result result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    // verify sub persistent and non-persistent topic
+    {
+        // 1. Use pattern to sub topic1, topic2, topic3
+        ConsumerConfiguration consConfig;
+        consConfig.setConsumerType(ConsumerShared);
+        consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::AllTopics);
+        Consumer consumer;
+        std::string pattern = "public/default/" + topicName + ".*";
+        ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, "sub-all", 
consConfig, consumer));
+        auto multiConsumerImplPtr = 
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 3);
+        ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName1));
+        ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName2));
+        ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName3));
+        ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4));
+
+        // 2. send msg to topic1, topic2, topic3, topic4
+        int messageNumber = 10;
+        for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+            auto content = "msg-content" + std::to_string(msgNum);
+            ASSERT_EQ(ResultOk, 
producer1.send(MessageBuilder().setContent(content).build()));
+            ASSERT_EQ(ResultOk, 
producer2.send(MessageBuilder().setContent(content).build()));
+            ASSERT_EQ(ResultOk, 
producer3.send(MessageBuilder().setContent(content).build()));
+            ASSERT_EQ(ResultOk, 
producer4.send(MessageBuilder().setContent(content).build()));
+        }
+
+        // 3. receive msg from topic1, topic2, topic3
+        Message m;
+        for (int i = 0; i < 3 * messageNumber; i++) {
+            ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+            ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+        }
+        // verify no more to receive, because producer4 not match pattern
+        ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+        ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    }
+
+    // verify only sub persistent topic
+    {
+        ConsumerConfiguration consConfig;
+        consConfig.setConsumerType(ConsumerShared);
+        
consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::PersistentOnly);
+        Consumer consumer;
+        std::string pattern = "public/default/" + topicName + ".*";
+        ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, 
"sub-persistent", consConfig, consumer));
+        auto multiConsumerImplPtr = 
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 2);
+        ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName1));
+        ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName2));
+        ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName3));
+        ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4));
+        ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    }
+
+    // verify only sub non-persistent topic
+    {
+        ConsumerConfiguration consConfig;
+        consConfig.setConsumerType(ConsumerShared);
+        
consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::NonPersistentOnly);
+        Consumer consumer;
+        std::string pattern = "public/default/" + topicName + ".*";
+        ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, 
"sub-non-persistent", consConfig, consumer));
+        auto multiConsumerImplPtr = 
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+        ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 1);
+        ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName1));
+        ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName2));
+        ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName3));
+        ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4));
+        ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    }
+
+    client.close();
+}
+
 class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
    public:
     void SetUp() override { producerConf_ = 
ProducerConfiguration().setBatchingEnabled(GetParam()); }
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 9467727..61311f4 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -70,71 +70,6 @@ TEST(LookupServiceTest, basicLookup) {
     ASSERT_EQ(url, lookupResult.physicalAddress);
 }
 
-TEST(LookupServiceTest, basicGetNamespaceTopics) {
-    std::string url = "pulsar://localhost:6650";
-    std::string adminUrl = "http://localhost:8080/";;
-    Result result;
-    // 1. create some topics under same namespace
-    Client client(url);
-
-    std::string topicName1 = 
"persistent://public/default/basicGetNamespaceTopics1";
-    std::string topicName2 = 
"persistent://public/default/basicGetNamespaceTopics2";
-    std::string topicName3 = 
"persistent://public/default/basicGetNamespaceTopics3";
-    // This is not in same namespace.
-    std::string topicName4 = 
"persistent://public/default-2/basicGetNamespaceTopics4";
-
-    // call admin api to make topics partitioned
-    std::string url1 = adminUrl + 
"admin/v2/persistent/public/default/basicGetNamespaceTopics1/partitions";
-    std::string url2 = adminUrl + 
"admin/v2/persistent/public/default/basicGetNamespaceTopics2/partitions";
-    std::string url3 = adminUrl + 
"admin/v2/persistent/public/default/basicGetNamespaceTopics3/partitions";
-
-    int res = makePutRequest(url1, "2");
-    ASSERT_FALSE(res != 204 && res != 409);
-    res = makePutRequest(url2, "3");
-    ASSERT_FALSE(res != 204 && res != 409);
-    res = makePutRequest(url3, "4");
-    ASSERT_FALSE(res != 204 && res != 409);
-
-    Producer producer1;
-    result = client.createProducer(topicName1, producer1);
-    ASSERT_EQ(ResultOk, result);
-    Producer producer2;
-    result = client.createProducer(topicName2, producer2);
-    ASSERT_EQ(ResultOk, result);
-    Producer producer3;
-    result = client.createProducer(topicName3, producer3);
-    ASSERT_EQ(ResultOk, result);
-    Producer producer4;
-    result = client.createProducer(topicName4, producer4);
-    ASSERT_EQ(ResultOk, result);
-
-    // 2.  call getTopicsOfNamespaceAsync
-    ExecutorServiceProviderPtr service = 
std::make_shared<ExecutorServiceProvider>(1);
-    AuthenticationPtr authData = AuthFactory::Disabled();
-    ClientConfiguration conf;
-    ExecutorServiceProviderPtr 
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
-    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
-    ServiceNameResolver serviceNameResolver(url);
-    BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
-
-    TopicNamePtr topicName = TopicName::get(topicName1);
-    NamespaceNamePtr nsName = topicName->getNamespaceName();
-
-    Future<Result, NamespaceTopicsPtr> getTopicsFuture = 
lookupService.getTopicsOfNamespaceAsync(nsName);
-    NamespaceTopicsPtr topicsData;
-    result = getTopicsFuture.get(topicsData);
-    ASSERT_EQ(ResultOk, result);
-    ASSERT_TRUE(topicsData != NULL);
-
-    // 3. verify result contains first 3 topic
-    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) 
!= topicsData->end());
-    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) 
!= topicsData->end());
-    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) 
!= topicsData->end());
-    ASSERT_FALSE(std::find(topicsData->begin(), topicsData->end(), topicName4) 
!= topicsData->end());
-
-    client.shutdown();
-}
-
 static void testMultiAddresses(LookupService& lookupService) {
     std::vector<Result> results;
     constexpr int numRequests = 6;
@@ -166,8 +101,10 @@ static void testMultiAddresses(LookupService& 
lookupService) {
     results.clear();
     for (int i = 0; i < numRequests; i++) {
         NamespaceTopicsPtr data;
-        const auto result =
-            
lookupService.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName()).get(data);
+        const auto result = lookupService
+                                
.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName(),
+                                                           
CommandGetTopicsOfNamespace_Mode_PERSISTENT)
+                                .get(data);
         LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result);
         results.emplace_back(result);
     }
@@ -212,7 +149,8 @@ TEST(LookupServiceTest, testRetry) {
     LOG_INFO("getPartitionMetadataAsync returns " << 
lookupDataResultPtr->getPartitions() << " partitions");
 
     PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
-    auto future3 = 
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+    auto future3 = 
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
+                                                            
CommandGetTopicsOfNamespace_Mode_PERSISTENT);
     NamespaceTopicsPtr namespaceTopicsPtr;
     ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
     LOG_INFO("getTopicPartitionName Async returns " << 
namespaceTopicsPtr->size() << " topics");
@@ -273,7 +211,8 @@ TEST(LookupServiceTest, testTimeout) {
     afterMethod("getPartitionMetadataAsync");
 
     beforeMethod();
-    auto future3 = 
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+    auto future3 = 
lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
+                                                            
CommandGetTopicsOfNamespace_Mode_PERSISTENT);
     NamespaceTopicsPtr namespaceTopicsPtr;
     ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
     afterMethod("getTopicsOfNamespaceAsync");
@@ -289,6 +228,50 @@ class LookupServiceTest : public 
::testing::TestWithParam<std::string> {
     Client client_{GetParam()};
 };
 
+TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
+    Result result;
+
+    auto nsName = NamespaceName::get("public", GetParam().substr(0, 4) + 
std::to_string(time(nullptr)));
+    std::string topicName1 = "persistent://" + nsName->toString() + 
"/basicGetNamespaceTopics1";
+    std::string topicName2 = "persistent://" + nsName->toString() + 
"/basicGetNamespaceTopics2";
+    std::string topicName3 = "non-persistent://" + nsName->toString() + 
"/basicGetNamespaceTopics3";
+
+    // 0. create a namespace
+    auto createNsUrl = httpLookupUrl + "/admin/v2/namespaces/" + 
nsName->toString();
+    auto res = makePutRequest(createNsUrl, "");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    // 1. trigger auto create topic
+    Producer producer1;
+    result = client_.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client_.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client_.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+
+    // 2. verify getTopicsOfNamespace by regex mode.
+    auto lookupServicePtr = 
PulsarFriend::getClientImplPtr(client_)->getLookup();
+    auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode,
+                               const std::set<std::string>& expectedTopics) {
+        Future<Result, NamespaceTopicsPtr> getTopicsFuture =
+            lookupServicePtr->getTopicsOfNamespaceAsync(nsName, mode);
+        NamespaceTopicsPtr topicsData;
+        result = getTopicsFuture.get(topicsData);
+        ASSERT_EQ(ResultOk, result);
+        ASSERT_TRUE(topicsData != NULL);
+        std::set<std::string> actualTopics(topicsData->begin(), 
topicsData->end());
+        ASSERT_EQ(expectedTopics, actualTopics);
+    };
+    verifyGetTopics(CommandGetTopicsOfNamespace_Mode_PERSISTENT, {topicName1, 
topicName2});
+    verifyGetTopics(CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT, 
{topicName3});
+    verifyGetTopics(CommandGetTopicsOfNamespace_Mode_ALL, {topicName1, 
topicName2, topicName3});
+
+    client_.close();
+}
+
 TEST_P(LookupServiceTest, testGetSchema) {
     const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) 
+ GetParam().substr(0, 4);
     std::string jsonSchema =
diff --git a/tests/TopicNameTest.cc b/tests/TopicNameTest.cc
index 41838f5..74dc605 100644
--- a/tests/TopicNameTest.cc
+++ b/tests/TopicNameTest.cc
@@ -191,3 +191,20 @@ TEST(TopicNameTest, testPartitionIndex) {
         ASSERT_EQ(topicName->getPartitionIndex(), partition);
     }
 }
+
+TEST(TopicNameTest, testRemoveDomain) {
+    auto topicName1 = "persistent://public/default/test-topic";
+    ASSERT_EQ("public/default/test-topic", 
TopicName::removeDomain(topicName1));
+
+    auto topicName2 = "non-persistent://public/default/test-topic";
+    ASSERT_EQ("public/default/test-topic", 
TopicName::removeDomain(topicName2));
+
+    auto topicName3 = "public/default/test-topic";
+    ASSERT_EQ(topicName3, TopicName::removeDomain(topicName2));
+}
+
+TEST(TopicNameTest, testContainsDomain) {
+    
ASSERT_TRUE(TopicName::containsDomain("persistent://public/default/test-topic"));
+    
ASSERT_TRUE(TopicName::containsDomain("non-persistent://public/default/test-topic"));
+    ASSERT_FALSE(TopicName::containsDomain("public/default/test-topic"));
+}


Reply via email to