This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b47e63d  PIP-121: Introduce ServiceInfoProvider to update service info 
dynamically (#541)
b47e63d is described below

commit b47e63dcb21949a0ac7da18d2336a8b92d569cc6
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Mar 13 14:49:04 2026 +0800

    PIP-121: Introduce ServiceInfoProvider to update service info dynamically 
(#541)
---
 include/pulsar/Client.h                 |  24 +++++
 include/pulsar/ClientConfiguration.h    |  29 +-----
 include/pulsar/ServiceInfo.h            |  57 ++++++++++
 include/pulsar/ServiceInfoProvider.h    |  62 +++++++++++
 include/pulsar/c/client_configuration.h |   7 --
 lib/AtomicSharedPtr.h                   |  41 ++++++++
 lib/BinaryProtoLookupService.h          |   5 +-
 lib/Client.cc                           |  19 ++--
 lib/ClientConfiguration.cc              |  13 ---
 lib/ClientConfigurationImpl.h           |   7 ++
 lib/ClientConnection.cc                 |  27 +++--
 lib/ClientConnection.h                  |   8 +-
 lib/ClientImpl.cc                       | 160 +++++++++++++++++++----------
 lib/ClientImpl.h                        |  58 +++++++++--
 lib/ConnectionPool.cc                   |  36 +++----
 lib/ConnectionPool.h                    |  21 +++-
 lib/ConsumerImpl.cc                     |  17 ++-
 lib/ConsumerImpl.h                      |   7 ++
 lib/DefaultServiceInfoProvider.h        |  42 ++++++++
 lib/HTTPLookupService.cc                |  13 ++-
 lib/HTTPLookupService.h                 |   4 +-
 lib/MultiTopicsConsumerImpl.cc          |  22 ++--
 lib/MultiTopicsConsumerImpl.h           |   7 +-
 lib/PartitionedProducerImpl.cc          |   8 +-
 lib/PartitionedProducerImpl.h           |   3 -
 lib/PatternMultiTopicsConsumerImpl.cc   |  13 +--
 lib/PatternMultiTopicsConsumerImpl.h    |   1 -
 lib/ReaderImpl.cc                       |   1 -
 lib/ServiceInfo.cc                      |  35 +++++++
 lib/c/c_ClientConfiguration.cc          |  12 ---
 perf/PerfConsumer.cc                    |   5 -
 perf/PerfProducer.cc                    |   5 -
 tests/AuthTokenTest.cc                  |   2 +-
 tests/BasicEndToEndTest.cc              |   3 +-
 tests/LookupServiceTest.cc              |  91 ++++++++--------
 tests/ServiceInfoProviderTest.cc        | 177 ++++++++++++++++++++++++++++++++
 36 files changed, 787 insertions(+), 255 deletions(-)

diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 5613066..e9813e3 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -29,9 +29,12 @@
 #include <pulsar/Reader.h>
 #include <pulsar/Result.h>
 #include <pulsar/Schema.h>
+#include <pulsar/ServiceInfo.h>
+#include <pulsar/ServiceInfoProvider.h>
 #include <pulsar/TableView.h>
 #include <pulsar/defines.h>
 
+#include <memory>
 #include <string>
 
 namespace pulsar {
@@ -68,6 +71,20 @@ class PULSAR_PUBLIC Client {
      */
     Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration);
 
+    /**
+     * Create a Pulsar client object using the specified ServiceInfoProvider.
+     *
+     * The ServiceInfoProvider is responsible for providing the service 
information (such as service URL)
+     * dynamically. For example, if it detects a primary Pulsar service is 
down, it can switch to a secondary
+     * service and update the client with the new service information.
+     *
+     * The Client instance takes ownership of the given ServiceInfoProvider. 
The provider will be destroyed
+     * as part of the client's shutdown lifecycle, for example when 
`Client::close()` or
+     * `Client::closeAsync()` is called, ensuring that its lifetime is 
properly managed.
+     */
+    static Client create(std::unique_ptr<ServiceInfoProvider> 
serviceInfoProvider,
+                         const ClientConfiguration& clientConfiguration);
+
     /**
      * Create a producer with default configuration
      *
@@ -414,6 +431,13 @@ class PULSAR_PUBLIC Client {
     void getSchemaInfoAsync(const std::string& topic, int64_t version,
                             std::function<void(Result, const SchemaInfo&)> 
callback);
 
+    /**
+     * Get the current service information of the client.
+     *
+     * @return the current service information
+     */
+    ServiceInfo getServiceInfo() const;
+
    private:
     Client(const std::shared_ptr<ClientImpl>&);
 
diff --git a/include/pulsar/ClientConfiguration.h 
b/include/pulsar/ClientConfiguration.h
index 98ccff7..b37b7c6 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -70,15 +70,12 @@ class PULSAR_PUBLIC ClientConfiguration {
     /**
      * Set the authentication method to be used with the broker
      *
+     * You can get the configured authentication data in `ServiceInfo` 
returned by `Client::getServiceInfo`.
+     *
      * @param authentication the authentication data to use
      */
     ClientConfiguration& setAuth(const AuthenticationPtr& authentication);
 
-    /**
-     * @return the authentication data
-     */
-    Authentication& getAuth() const;
-
     /**
      * Set timeout on client operations (subscribe, create producer, close, 
unsubscribe)
      * Default is 30 seconds.
@@ -202,20 +199,6 @@ class PULSAR_PUBLIC ClientConfiguration {
      */
     ClientConfiguration& setLogger(LoggerFactory* loggerFactory);
 
-    /**
-     * Configure whether to use the TLS encryption on the connections.
-     *
-     * The default value is false.
-     *
-     * @param useTls
-     */
-    ClientConfiguration& setUseTls(bool useTls);
-
-    /**
-     * @return whether the TLS encryption is used on the connections
-     */
-    bool isUseTls() const;
-
     /**
      * Set the path to the TLS private key file.
      *
@@ -243,15 +226,13 @@ class PULSAR_PUBLIC ClientConfiguration {
     /**
      * Set the path to the trusted TLS certificate file.
      *
+     * You can get the configured trusted TLS certificate file path in 
`ServiceInfo` returned by
+     * `Client::getServiceInfo`.
+     *
      * @param tlsTrustCertsFilePath
      */
     ClientConfiguration& setTlsTrustCertsFilePath(const std::string& 
tlsTrustCertsFilePath);
 
-    /**
-     * @return the path to the trusted TLS certificate file
-     */
-    const std::string& getTlsTrustCertsFilePath() const;
-
     /**
      * Configure whether the Pulsar client accepts untrusted TLS certificates 
from brokers.
      *
diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h
new file mode 100644
index 0000000..1f63ce3
--- /dev/null
+++ b/include/pulsar/ServiceInfo.h
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_SERVICE_INFO_H_
+#define PULSAR_SERVICE_INFO_H_
+
+#include <pulsar/Authentication.h>
+
+#include <optional>
+#include <string>
+
+namespace pulsar {
+
+/**
+ * ServiceInfo encapsulates the information of a Pulsar service, which is used 
by the client to connect to the
+ * service. It includes the service URL, authentication information, and TLS 
configuration.
+ */
+class PULSAR_PUBLIC ServiceInfo final {
+   public:
+    ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = 
AuthFactory::Disabled(),
+                std::optional<std::string> tlsTrustCertsFilePath = 
std::nullopt);
+
+    auto& serviceUrl() const noexcept { return serviceUrl_; }
+    auto useTls() const noexcept { return useTls_; }
+    auto& authentication() const noexcept { return authentication_; }
+    auto& tlsTrustCertsFilePath() const noexcept { return 
tlsTrustCertsFilePath_; }
+
+    bool operator==(const ServiceInfo& other) const noexcept {
+        return serviceUrl_ == other.serviceUrl_ && useTls_ == other.useTls_ &&
+               authentication_ == other.authentication_ &&
+               tlsTrustCertsFilePath_ == other.tlsTrustCertsFilePath_;
+    }
+
+   private:
+    std::string serviceUrl_;
+    bool useTls_;
+    AuthenticationPtr authentication_;
+    std::optional<std::string> tlsTrustCertsFilePath_;
+};
+
+}  // namespace pulsar
+#endif
diff --git a/include/pulsar/ServiceInfoProvider.h 
b/include/pulsar/ServiceInfoProvider.h
new file mode 100644
index 0000000..1b518da
--- /dev/null
+++ b/include/pulsar/ServiceInfoProvider.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_
+#define PULSAR_SERVICE_INFO_PROVIDER_H_
+
+#include <pulsar/ServiceInfo.h>
+
+#include <functional>
+
+namespace pulsar {
+
+class PULSAR_PUBLIC ServiceInfoProvider {
+   public:
+    /**
+     * The destructor will be called when `Client::close()` is invoked, and 
the provider should stop any
+     * ongoing work and release the resources in the destructor.
+     */
+    virtual ~ServiceInfoProvider() = default;
+
+    /**
+     * Get the initial `ServiceInfo` connection for the client.
+     * This method is called **only once** internally in `Client::create()` to 
get the initial `ServiceInfo`
+     * for the client to connect to the Pulsar service, typically before 
{@link initialize} is invoked.
+     * Since it's only called once, it's legal to return a moved `ServiceInfo` 
object to avoid unnecessary
+     * copying.
+     */
+    virtual ServiceInfo initialServiceInfo() = 0;
+
+    /**
+     * Initialize the ServiceInfoProvider.
+     *
+     * After the client has obtained the initial `ServiceInfo` via {@link 
initialServiceInfo}, this method is
+     * called to allow the provider to start any background work (for example, 
service discovery or watching
+     * configuration changes) and to report subsequent updates to the service 
information.
+     *
+     * @param onServiceInfoUpdate the callback to deliver updated 
`ServiceInfo` values to the client after
+     *                            the initial connection has been established
+     *
+     * Implementations may choose not to invoke `onServiceInfoUpdate` if the 
`ServiceInfo` never changes.
+     */
+    virtual void initialize(std::function<void(ServiceInfo)> 
onServiceInfoUpdate) = 0;
+};
+
+};  // namespace pulsar
+
+#endif
diff --git a/include/pulsar/c/client_configuration.h 
b/include/pulsar/c/client_configuration.h
index 9e15453..1be7c1f 100644
--- a/include/pulsar/c/client_configuration.h
+++ b/include/pulsar/c/client_configuration.h
@@ -147,16 +147,9 @@ PULSAR_PUBLIC void 
pulsar_client_configuration_set_logger(pulsar_client_configur
 PULSAR_PUBLIC void 
pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *conf,
                                                             pulsar_logger_t 
logger);
 
-PULSAR_PUBLIC void 
pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, 
int useTls);
-
-PULSAR_PUBLIC int 
pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf);
-
 PULSAR_PUBLIC void pulsar_client_configuration_set_tls_trust_certs_file_path(
     pulsar_client_configuration_t *conf, const char *tlsTrustCertsFilePath);
 
-PULSAR_PUBLIC const char 
*pulsar_client_configuration_get_tls_trust_certs_file_path(
-    pulsar_client_configuration_t *conf);
-
 PULSAR_PUBLIC void 
pulsar_client_configuration_set_tls_allow_insecure_connection(
     pulsar_client_configuration_t *conf, int allowInsecure);
 
diff --git a/lib/AtomicSharedPtr.h b/lib/AtomicSharedPtr.h
new file mode 100644
index 0000000..30dea07
--- /dev/null
+++ b/lib/AtomicSharedPtr.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <memory>
+namespace pulsar {
+
+// C++17 does not have std::atomic<std::shared_ptr<T>>, so we have to manually 
implement it.
+template <typename T>
+class AtomicSharedPtr {
+   public:
+    using Pointer = std::shared_ptr<const T>;
+
+    AtomicSharedPtr() = default;
+    explicit AtomicSharedPtr(T value) : ptr_(std::make_shared<const 
T>(std::move(value))) {}
+
+    auto load() const { return std::atomic_load(&ptr_); }
+
+    void store(Pointer&& newPtr) { std::atomic_store(&ptr_, 
std::move(newPtr)); }
+
+   private:
+    std::shared_ptr<const T> ptr_;
+};
+
+}  // namespace pulsar
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index 948c7f1..35dcb16 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -22,6 +22,7 @@
 #include <pulsar/Authentication.h>
 #include <pulsar/ClientConfiguration.h>
 #include <pulsar/Schema.h>
+#include <pulsar/ServiceInfo.h>
 
 #include <mutex>
 
@@ -38,9 +39,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, 
SchemaInfo>>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
-    BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& 
pool,
+    BinaryProtoLookupService(const ServiceInfo& serviceInfo, ConnectionPool& 
pool,
                              const ClientConfiguration& clientConfiguration)
-        : serviceNameResolver_(serviceUrl),
+        : serviceNameResolver_(serviceInfo.serviceUrl()),
           cnxPool_(pool),
           listenerName_(clientConfiguration.getListenerName()),
           maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
diff --git a/lib/Client.cc b/lib/Client.cc
index 39a5948..48e92dd 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include <pulsar/Client.h>
+#include <pulsar/ServiceInfoProvider.h>
 
 #include <iostream>
 #include <memory>
@@ -33,13 +34,17 @@ DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 
-Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) {}
+Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) { 
impl_->initialize(); }
 
-Client::Client(const std::string& serviceUrl)
-    : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
+Client::Client(const std::string& serviceUrl) : Client(serviceUrl, 
ClientConfiguration()) {}
 
 Client::Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration)
-    : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
+    : Client(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
+
+Client Client::create(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
+                      const ClientConfiguration& clientConfiguration) {
+    return Client(std::make_shared<ClientImpl>(std::move(serviceInfoProvider), 
clientConfiguration));
+}
 
 Result Client::createProducer(const std::string& topic, Producer& producer) {
     return createProducer(topic, ProducerConfiguration(), producer);
@@ -193,8 +198,10 @@ uint64_t Client::getNumberOfConsumers() { return 
impl_->getNumberOfConsumers();
 
 void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
                                 std::function<void(Result, const SchemaInfo&)> 
callback) {
-    impl_->getLookup()
-        ->getSchema(TopicName::get(topic), (version >= 0) ? 
toBigEndianBytes(version) : "")
+    impl_->getSchema(TopicName::get(topic), (version >= 0) ? 
toBigEndianBytes(version) : "")
         .addListener(std::move(callback));
 }
+
+ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }
+
 }  // namespace pulsar
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index b99c5d2..c59dd43 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -57,8 +57,6 @@ ClientConfiguration& ClientConfiguration::setAuth(const 
AuthenticationPtr& authe
     return *this;
 }
 
-Authentication& ClientConfiguration::getAuth() const { return 
*impl_->authenticationPtr; }
-
 const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return 
impl_->authenticationPtr; }
 
 ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int 
timeout) {
@@ -94,13 +92,6 @@ ClientConfiguration& 
ClientConfiguration::setMessageListenerThreads(int threads)
 
 int ClientConfiguration::getMessageListenerThreads() const { return 
impl_->messageListenerThreads; }
 
-ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) {
-    impl_->useTls = useTls;
-    return *this;
-}
-
-bool ClientConfiguration::isUseTls() const { return impl_->useTls; }
-
 ClientConfiguration& ClientConfiguration::setValidateHostName(bool 
validateHostName) {
     impl_->validateHostName = validateHostName;
     return *this;
@@ -131,10 +122,6 @@ ClientConfiguration& 
ClientConfiguration::setTlsTrustCertsFilePath(const std::st
     return *this;
 }
 
-const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const {
-    return impl_->tlsTrustCertsFilePath;
-}
-
 ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool 
allowInsecure) {
     impl_->tlsAllowInsecureConnection = allowInsecure;
     return *this;
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index e7a83a1..45b2aa3 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -20,8 +20,10 @@
 #define LIB_CLIENTCONFIGURATIONIMPL_H_
 
 #include <pulsar/ClientConfiguration.h>
+#include <pulsar/ServiceInfo.h>
 
 #include <chrono>
+#include <optional>
 
 namespace pulsar {
 
@@ -53,6 +55,11 @@ struct ClientConfigurationImpl {
     ClientConfiguration::ProxyProtocol proxyProtocol;
 
     std::unique_ptr<LoggerFactory> takeLogger() { return 
std::move(loggerFactory); }
+
+    ServiceInfo toServiceInfo(const std::string& serviceUrl) const {
+        return {serviceUrl, authenticationPtr,
+                tlsTrustCertsFilePath.empty() ? std::nullopt : 
std::make_optional(tlsTrustCertsFilePath)};
+    }
 };
 }  // namespace pulsar
 
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index c373c25..cc7e1f6 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -19,7 +19,9 @@
 #include "ClientConnection.h"
 
 #include <openssl/x509.h>
+#include <pulsar/Authentication.h>
 #include <pulsar/MessageIdBuilder.h>
+#include <pulsar/ServiceInfo.h>
 
 #include <chrono>
 #include <fstream>
@@ -37,6 +39,8 @@
 #include "ProducerImpl.h"
 #include "PulsarApi.pb.h"
 #include "ResultUtils.h"
+#include "ServiceNameResolver.h"
+#include "ServiceURI.h"
 #include "Url.h"
 #include "auth/AuthOauth2.h"
 #include "auth/InitialAuthData.h"
@@ -179,12 +183,11 @@ static bool file_exists(const std::string& path) {
 std::atomic<int32_t> 
ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize};
 
 ClientConnection::ClientConnection(const std::string& logicalAddress, const 
std::string& physicalAddress,
-                                   const ExecutorServicePtr& executor,
+                                   const ServiceInfo& serviceInfo, const 
ExecutorServicePtr& executor,
                                    const ClientConfiguration& 
clientConfiguration,
-                                   const AuthenticationPtr& authentication, 
const std::string& clientVersion,
-                                   ConnectionPool& pool, size_t poolIndex)
+                                   const std::string& clientVersion, 
ConnectionPool& pool, size_t poolIndex)
     : operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
-      authentication_(authentication),
+      authentication_(serviceInfo.authentication()),
       serverProtocolVersion_(proto::ProtocolVersion_MIN),
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
@@ -210,15 +213,14 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
         return;
     }
 
-    auto oauth2Auth = std::dynamic_pointer_cast<AuthOauth2>(authentication_);
-    if (oauth2Auth) {
+    if (auto oauth2Auth = 
std::dynamic_pointer_cast<AuthOauth2>(authentication_)) {
         // Configure the TLS trust certs file for Oauth2
         auto authData = std::dynamic_pointer_cast<AuthenticationDataProvider>(
-            
std::make_shared<InitialAuthData>(clientConfiguration.getTlsTrustCertsFilePath()));
+            
std::make_shared<InitialAuthData>(serviceInfo.tlsTrustCertsFilePath().value_or("")));
         oauth2Auth->getAuthData(authData);
     }
 
-    if (clientConfiguration.isUseTls()) {
+    if (serviceInfo.useTls()) {
         ASIO::ssl::context ctx(ASIO::ssl::context::sslv23_client);
         ctx.set_options(ASIO::ssl::context::default_workarounds | 
ASIO::ssl::context::no_sslv2 |
                         ASIO::ssl::context::no_sslv3 | 
ASIO::ssl::context::no_tlsv1 |
@@ -239,8 +241,8 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
         } else {
             ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
 
-            const auto& trustCertFilePath = 
clientConfiguration.getTlsTrustCertsFilePath();
-            if (!trustCertFilePath.empty()) {
+            if (serviceInfo.tlsTrustCertsFilePath()) {
+                const auto& trustCertFilePath = 
*serviceInfo.tlsTrustCertsFilePath();
                 if (file_exists(trustCertFilePath)) {
                     ctx.load_verify_file(trustCertFilePath);
                 } else {
@@ -1247,7 +1249,7 @@ void ClientConnection::handleConsumerStatsTimeout(const 
ASIO_ERROR& ec,
     startConsumerStatsTimer(consumerStatsRequests);
 }
 
-const std::future<void>& ClientConnection::close(Result result) {
+const std::future<void>& ClientConnection::close(Result result, bool 
switchCluster) {
     Lock lock(mutex_);
     if (closeFuture_) {
         connectPromise_.setFailed(result);
@@ -1332,6 +1334,9 @@ const std::future<void>& ClientConnection::close(Result 
result) {
     for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); 
++it) {
         auto consumer = it->second.lock();
         if (consumer) {
+            if (switchCluster) {
+                consumer->onClusterSwitching();
+            }
             consumer->handleDisconnection(result, self);
         }
     }
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index b9880ee..75e4bca 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -20,6 +20,7 @@
 #define _PULSAR_CLIENT_CONNECTION_HEADER_
 
 #include <pulsar/ClientConfiguration.h>
+#include <pulsar/ServiceInfo.h>
 #include <pulsar/defines.h>
 
 #include <any>
@@ -145,8 +146,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      *
      */
     ClientConnection(const std::string& logicalAddress, const std::string& 
physicalAddress,
-                     const ExecutorServicePtr& executor, const 
ClientConfiguration& clientConfiguration,
-                     const AuthenticationPtr& authentication, const 
std::string& clientVersion,
+                     const ServiceInfo& serviceInfo, const ExecutorServicePtr& 
executor,
+                     const ClientConfiguration& clientConfiguration, const 
std::string& clientVersion,
                      ConnectionPool& pool, size_t poolIndex);
     ~ClientConnection();
 
@@ -160,8 +161,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * Close the connection.
      *
      * @param result all pending futures will complete with this result
+     * @param switchCluster whether the close is triggered by cluster switching
      */
-    const std::future<void>& close(Result result = ResultConnectError);
+    const std::future<void>& close(Result result = ResultConnectError, bool 
switchCluster = false);
 
     bool isClosed() const;
 
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index eec3b34..b84c14c 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -24,7 +24,9 @@
 #include <algorithm>
 #include <chrono>
 #include <iterator>
+#include <mutex>
 #include <random>
+#include <shared_mutex>
 #include <sstream>
 
 #include "BinaryProtoLookupService.h"
@@ -32,6 +34,7 @@
 #include "Commands.h"
 #include "ConsumerImpl.h"
 #include "ConsumerInterceptors.h"
+#include "DefaultServiceInfoProvider.h"
 #include "ExecutorService.h"
 #include "HTTPLookupService.h"
 #include "LogUtils.h"
@@ -74,20 +77,17 @@ std::string generateRandomName() {
     return randomName;
 }
 
-typedef std::unique_lock<std::mutex> Lock;
-
 typedef std::vector<std::string> StringList;
 
-static LookupServicePtr defaultLookupServiceFactory(const std::string& 
serviceUrl,
+static LookupServicePtr defaultLookupServiceFactory(const ServiceInfo& 
serviceInfo,
                                                     const ClientConfiguration& 
clientConfiguration,
-                                                    ConnectionPool& pool, 
const AuthenticationPtr& auth) {
-    if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
+                                                    ConnectionPool& pool) {
+    if (ServiceNameResolver::useHttp(ServiceURI(serviceInfo.serviceUrl()))) {
         LOG_DEBUG("Using HTTP Lookup");
-        return std::make_shared<HTTPLookupService>(serviceUrl, 
std::cref(clientConfiguration),
-                                                   std::cref(auth));
+        return std::make_shared<HTTPLookupService>(std::cref(serviceInfo), 
std::cref(clientConfiguration));
     } else {
         LOG_DEBUG("Using Binary Lookup");
-        return std::make_shared<BinaryProtoLookupService>(serviceUrl, 
std::ref(pool),
+        return 
std::make_shared<BinaryProtoLookupService>(std::cref(serviceInfo), 
std::ref(pool),
                                                           
std::cref(clientConfiguration));
     }
 }
@@ -97,17 +97,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration&
 
 ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration& clientConfiguration,
                        LookupServiceFactory&& lookupServiceFactory)
-    : mutex_(),
+    : 
ClientImpl(std::make_unique<DefaultServiceInfoProvider>(std::cref(serviceUrl),
+                                                              
std::cref(*clientConfiguration.impl_)),
+                 clientConfiguration, std::move(lookupServiceFactory)) {}
+
+ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider> 
serviceInfoProvider,
+                       const ClientConfiguration& clientConfiguration)
+    : ClientImpl(std::move(serviceInfoProvider), clientConfiguration, 
&defaultLookupServiceFactory) {}
+
+ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider> 
serviceInfoProvider,
+                       const ClientConfiguration& clientConfiguration,
+                       LookupServiceFactory&& lookupServiceFactory)
+    : serviceInfoProvider_(std::move(serviceInfoProvider)),
       state_(Open),
-      clientConfiguration_(ClientConfiguration(clientConfiguration)
-                               
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
+      clientConfiguration_(clientConfiguration),
+      serviceInfo_(serviceInfoProvider_->initialServiceInfo()),
       memoryLimitController_(clientConfiguration.getMemoryLimit()),
       
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
       listenerExecutorProvider_(
           
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
       partitionListenerExecutorProvider_(
           
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
-      pool_(clientConfiguration_, ioExecutorProvider_, 
clientConfiguration_.getAuthPtr(),
+      pool_(serviceInfo_, clientConfiguration_, ioExecutorProvider_,
             ClientImpl::getClientVersion(clientConfiguration)),
       producerIdGenerator_(0),
       consumerIdGenerator_(0),
@@ -119,14 +130,24 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, 
const ClientConfiguration&
     if (loggerFactory) {
         LogUtils::setLoggerFactory(std::move(loggerFactory));
     }
-    lookupServicePtr_ = createLookup(serviceUrl);
+
+    lookupServicePtr_ = createLookup(*serviceInfo_.load());
 }
 
 ClientImpl::~ClientImpl() { shutdown(); }
 
-LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
+void ClientImpl::initialize() {
+    auto weakSelf = weak_from_this();
+    serviceInfoProvider_->initialize([weakSelf](ServiceInfo serviceInfo) {
+        if (auto self = weakSelf.lock()) {
+            self->updateServiceInfo(std::move(serviceInfo));
+        }
+    });
+}
+
+LookupServicePtr ClientImpl::createLookup(ServiceInfo serviceInfo) {
     auto lookupServicePtr = RetryableLookupService::create(
-        lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, 
clientConfiguration_.getAuthPtr()),
+        lookupServiceFactory_(std::move(serviceInfo), clientConfiguration_, 
pool_),
         clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
     return lookupServicePtr;
 }
@@ -144,19 +165,26 @@ ExecutorServiceProviderPtr 
ClientImpl::getPartitionListenerExecutorProvider() {
 }
 
 LookupServicePtr ClientImpl::getLookup(const std::string& 
redirectedClusterURI) {
+    std::shared_lock readLock(mutex_);
     if (redirectedClusterURI.empty()) {
         return lookupServicePtr_;
     }
 
-    Lock lock(mutex_);
-    auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
-    if (it == redirectedClusterLookupServicePtrs_.end()) {
-        auto lookup = createLookup(redirectedClusterURI);
-        redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, 
lookup);
-        return lookup;
+    if (auto it = 
redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
+        it != redirectedClusterLookupServicePtrs_.end()) {
+        return it->second;
     }
+    readLock.unlock();
 
-    return it->second;
+    std::unique_lock writeLock(mutex_);
+    // Double check in case another thread acquires the lock and inserts a 
pair first
+    if (auto it = 
redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
+        it != redirectedClusterLookupServicePtrs_.end()) {
+        return it->second;
+    }
+    auto lookup = createRedirectedLookup(redirectedClusterURI);
+    redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
+    return lookup;
 }
 
 void ClientImpl::createProducerAsync(const std::string& topic, const 
ProducerConfiguration& conf,
@@ -166,7 +194,7 @@ void ClientImpl::createProducerAsync(const std::string& 
topic, const ProducerCon
     }
     TopicNamePtr topicName;
     {
-        Lock lock(mutex_);
+        std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
             callback(ResultAlreadyClosed, Producer());
@@ -180,7 +208,7 @@ void ClientImpl::createProducerAsync(const std::string& 
topic, const ProducerCon
 
     if (autoDownloadSchema) {
         auto self = shared_from_this();
-        lookupServicePtr_->getSchema(topicName).addListener(
+        getSchema(topicName).addListener(
             [self, topicName, callback](Result res, const SchemaInfo& 
topicSchema) {
                 if (res != ResultOk) {
                     callback(res, Producer());
@@ -188,12 +216,12 @@ void ClientImpl::createProducerAsync(const std::string& 
topic, const ProducerCon
                 }
                 ProducerConfiguration conf;
                 conf.setSchema(topicSchema);
-                
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+                self->getPartitionMetadataAsync(topicName).addListener(
                     std::bind(&ClientImpl::handleCreateProducer, self, 
std::placeholders::_1,
                               std::placeholders::_2, topicName, conf, 
callback));
             });
     } else {
-        lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+        getPartitionMetadataAsync(topicName).addListener(
             std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), 
std::placeholders::_1,
                       std::placeholders::_2, topicName, conf, callback));
     }
@@ -253,7 +281,7 @@ void ClientImpl::createReaderAsync(const std::string& 
topic, const MessageId& st
                                    const ReaderConfiguration& conf, const 
ReaderCallback& callback) {
     TopicNamePtr topicName;
     {
-        Lock lock(mutex_);
+        std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
             callback(ResultAlreadyClosed, Reader());
@@ -266,7 +294,7 @@ void ClientImpl::createReaderAsync(const std::string& 
topic, const MessageId& st
     }
 
     MessageId msgId(startMessageId);
-    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+    getPartitionMetadataAsync(topicName).addListener(
         std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), 
std::placeholders::_1,
                   std::placeholders::_2, topicName, msgId, conf, callback));
 }
@@ -275,7 +303,7 @@ void ClientImpl::createTableViewAsync(const std::string& 
topic, const TableViewC
                                       const TableViewCallback& callback) {
     TopicNamePtr topicName;
     {
-        Lock lock(mutex_);
+        std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
             callback(ResultAlreadyClosed, TableView());
@@ -341,7 +369,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& 
regexPattern, const
                                          const SubscribeCallback& callback) {
     TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
 
-    Lock lock(mutex_);
+    std::shared_lock lock(mutex_);
     if (state_ != Open) {
         lock.unlock();
         callback(ResultAlreadyClosed, Consumer());
@@ -379,7 +407,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& 
regexPattern, const
             return;
     }
 
-    
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), 
mode)
+    getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
         .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, 
shared_from_this(),
                                std::placeholders::_1, std::placeholders::_2, 
regexPattern, mode,
                                subscriptionName, conf, callback));
@@ -401,9 +429,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result 
result, const Namespace
 
         auto interceptors = 
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
 
-        consumer = 
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), 
regexPattern, mode,
-                                                                    
*matchTopics, subscriptionName, conf,
-                                                                    
lookupServicePtr_, interceptors);
+        consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(
+            shared_from_this(), regexPattern, mode, *matchTopics, 
subscriptionName, conf, interceptors);
 
         consumer->getConsumerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), 
std::placeholders::_1,
@@ -426,7 +453,7 @@ void ClientImpl::subscribeAsync(const 
std::vector<std::string>& originalTopics,
     auto it = std::unique(topics.begin(), topics.end());
     auto newSize = std::distance(topics.begin(), it);
     topics.resize(newSize);
-    Lock lock(mutex_);
+    std::shared_lock lock(mutex_);
     if (state_ != Open) {
         lock.unlock();
         callback(ResultAlreadyClosed, Consumer());
@@ -450,7 +477,7 @@ void ClientImpl::subscribeAsync(const 
std::vector<std::string>& originalTopics,
     auto interceptors = 
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
 
     ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
-        shared_from_this(), topics, subscriptionName, topicNamePtr, conf, 
lookupServicePtr_, interceptors);
+        shared_from_this(), topics, subscriptionName, topicNamePtr, conf, 
interceptors);
 
     
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
                                                                
shared_from_this(), std::placeholders::_1,
@@ -462,7 +489,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, 
const std::string& sub
                                 const ConsumerConfiguration& conf, const 
SubscribeCallback& callback) {
     TopicNamePtr topicName;
     {
-        Lock lock(mutex_);
+        std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
             callback(ResultAlreadyClosed, Consumer());
@@ -480,7 +507,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, 
const std::string& sub
         }
     }
 
-    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+    getPartitionMetadataAsync(topicName).addListener(
         std::bind(&ClientImpl::handleSubscribe, shared_from_this(), 
std::placeholders::_1,
                   std::placeholders::_2, topicName, subscriptionName, conf, 
callback));
 }
@@ -503,9 +530,9 @@ void ClientImpl::handleSubscribe(Result result, const 
LookupDataResultPtr& parti
                     callback(ResultInvalidConfiguration, Consumer());
                     return;
                 }
-                consumer = std::make_shared<MultiTopicsConsumerImpl>(
-                    shared_from_this(), topicName, 
partitionMetadata->getPartitions(), subscriptionName, conf,
-                    lookupServicePtr_, interceptors);
+                consumer = 
std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
+                                                                     
partitionMetadata->getPartitions(),
+                                                                     
subscriptionName, conf, interceptors);
             } else {
                 auto consumerImpl = 
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
                                                                    
subscriptionName, conf,
@@ -647,7 +674,7 @@ void ClientImpl::handleGetPartitions(Result result, const 
LookupDataResultPtr& p
 void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const 
GetPartitionsCallback& callback) {
     TopicNamePtr topicName;
     {
-        Lock lock(mutex_);
+        std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
             callback(ResultAlreadyClosed, StringList());
@@ -658,13 +685,16 @@ void ClientImpl::getPartitionsForTopicAsync(const 
std::string& topic, const GetP
             return;
         }
     }
-    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
-        std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), 
std::placeholders::_1,
-                  std::placeholders::_2, topicName, callback));
+    
getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions,
+                                                               
shared_from_this(), std::placeholders::_1,
+                                                               
std::placeholders::_2, topicName, callback));
 }
 
 void ClientImpl::closeAsync(const CloseCallback& callback) {
+    serviceInfoProvider_.reset();
+    std::unique_lock lock(mutex_);
     if (state_ != Open) {
+        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -678,6 +708,8 @@ void ClientImpl::closeAsync(const CloseCallback& callback) {
     for (const auto& it : redirectedClusterLookupServicePtrs_) {
         it.second->close();
     }
+    redirectedClusterLookupServicePtrs_.clear();
+    lock.unlock();
 
     auto producers = producers_.move();
     auto consumers = consumers_.move();
@@ -726,7 +758,7 @@ void ClientImpl::handleClose(Result result, const 
SharedInt& numberOfOpenHandler
         --(*numberOfOpenHandlers);
     }
     if (*numberOfOpenHandlers == 0) {
-        Lock lock(mutex_);
+        std::unique_lock lock(mutex_);
         if (state_ == Closed) {
             LOG_DEBUG("Client is already shutting down, possible race 
condition in handleClose");
             return;
@@ -776,7 +808,9 @@ void ClientImpl::shutdown() {
                                    << " consumers have been shutdown.");
     }
 
+    std::shared_lock lock(mutex_);
     lookupServicePtr_->close();
+    lock.unlock();
     if (!pool_.close()) {
         // pool_ has already been closed. It means shutdown() has been called 
before.
         return;
@@ -805,15 +839,9 @@ void ClientImpl::shutdown() {
     lookupCount_ = 0;
 }
 
-uint64_t ClientImpl::newProducerId() {
-    Lock lock(mutex_);
-    return producerIdGenerator_++;
-}
+uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; }
 
-uint64_t ClientImpl::newConsumerId() {
-    Lock lock(mutex_);
-    return consumerIdGenerator_++;
-}
+uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; }
 
 uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; }
 
@@ -854,4 +882,28 @@ std::chrono::nanoseconds 
ClientImpl::getOperationTimeout(const ClientConfigurati
     return clientConfiguration.impl_->operationTimeout;
 }
 
+void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
+    std::unique_lock lock{mutex_};
+    if (state_ != Open) {
+        LOG_ERROR("Client is not open, cannot update service info");
+        return;
+    }
+
+    serviceInfo_.store(std::make_shared<const ServiceInfo>(serviceInfo));
+    pool_.closeAllConnectionsForNewCluster();
+    if (lookupServicePtr_) {
+        lookupServicePtr_->close();
+    }
+    lookupServicePtr_ = createLookup(serviceInfo);
+
+    for (auto&& it : redirectedClusterLookupServicePtrs_) {
+        it.second->close();
+    }
+    redirectedClusterLookupServicePtrs_.clear();
+    useProxy_ = false;
+    lookupCount_ = 0;
+}
+
+ServiceInfo ClientImpl::getServiceInfo() const { return 
*(serviceInfo_.load()); }
+
 } /* namespace pulsar */
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 0b4d596..7772b15 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -20,14 +20,20 @@
 #define LIB_CLIENTIMPL_H_
 
 #include <pulsar/Client.h>
+#include <pulsar/ServiceInfo.h>
+#include <pulsar/ServiceInfoProvider.h>
 
 #include <atomic>
 #include <cstdint>
 #include <memory>
+#include <mutex>
+#include <shared_mutex>
 
+#include "AtomicSharedPtr.h"
 #include "ConnectionPool.h"
 #include "Future.h"
 #include "LookupDataResult.h"
+#include "LookupService.h"
 #include "MemoryLimitController.h"
 #include "ProtoApiEnums.h"
 #include "SynchronizedHashMap.h"
@@ -52,10 +58,8 @@ typedef std::weak_ptr<ConsumerImplBase> 
ConsumerImplBaseWeakPtr;
 class ClientConnection;
 using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
 
-class LookupService;
-using LookupServicePtr = std::shared_ptr<LookupService>;
-using LookupServiceFactory = std::function<LookupServicePtr(const 
std::string&, const ClientConfiguration&,
-                                                            ConnectionPool& 
pool, const AuthenticationPtr&)>;
+using LookupServiceFactory = std::function<LookupServicePtr(
+    ServiceInfo&& serviceInfo, const ClientConfiguration&, ConnectionPool& 
pool)>;
 
 class ProducerImplBase;
 using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -71,12 +75,17 @@ std::string generateRandomName();
 
 class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
    public:
+    ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
+               const ClientConfiguration& clientConfiguration);
+    ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
+               const ClientConfiguration& clientConfiguration, 
LookupServiceFactory&& lookupServiceFactory);
     ClientImpl(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration);
 
     // only for tests
     ClientImpl(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration,
                LookupServiceFactory&& lookupServiceFactory);
 
+    void initialize();
     virtual ~ClientImpl();
 
     /**
@@ -128,7 +137,6 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr getIOExecutorProvider();
     ExecutorServiceProviderPtr getListenerExecutorProvider();
     ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
-    LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");
 
     void cleanupProducer(ProducerImplBase* address) { 
producers_.remove(address); }
 
@@ -139,6 +147,26 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     ConnectionPool& getConnectionPool() noexcept { return pool_; }
     uint64_t getLookupCount() { return lookupCount_; }
 
+    void updateServiceInfo(ServiceInfo&& serviceInfo);
+    ServiceInfo getServiceInfo() const;
+
+    // Since the underlying `lookupServicePtr_` can be modified by 
`updateServiceInfo`, we should not expose
+    // it to other classes, otherwise the update might not be visible.
+    auto getPartitionMetadataAsync(const TopicNamePtr& topicName) {
+        std::shared_lock lock(mutex_);
+        return lookupServicePtr_->getPartitionMetadataAsync(topicName);
+    }
+
+    auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName, 
CommandGetTopicsOfNamespace_Mode mode) {
+        std::shared_lock lock(mutex_);
+        return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode);
+    }
+
+    auto getSchema(const TopicNamePtr& topicName, const std::string& version = 
"") {
+        std::shared_lock lock(mutex_);
+        return lookupServicePtr_->getSchema(topicName, version);
+    }
+
     static std::chrono::nanoseconds getOperationTimeout(const 
ClientConfiguration& clientConfiguration);
 
     friend class PulsarFriend;
@@ -177,7 +205,17 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     const std::string& getPhysicalAddress(const std::string& 
redirectedClusterURI,
                                           const std::string& logicalAddress);
 
-    LookupServicePtr createLookup(const std::string& serviceUrl);
+    // This overload is only used for blue-green migration, where only the 
service URL is modified, the other
+    // parameters remain the same
+    LookupServicePtr createRedirectedLookup(const std::string& redirectedUrl) {
+        auto serviceInfo = serviceInfo_.load();
+        return createLookup(
+            ServiceInfo{redirectedUrl, serviceInfo->authentication(), 
serviceInfo->tlsTrustCertsFilePath()});
+    }
+
+    LookupServicePtr createLookup(ServiceInfo serviceInfo);
+
+    LookupServicePtr getLookup(const std::string& redirectedClusterURI);
 
     static std::string getClientVersion(const ClientConfiguration& 
clientConfiguration);
 
@@ -188,10 +226,12 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
         Closed
     };
 
-    std::mutex mutex_;
+    std::unique_ptr<ServiceInfoProvider> serviceInfoProvider_;
+    mutable std::shared_mutex mutex_;
 
     State state_;
     ClientConfiguration clientConfiguration_;
+    AtomicSharedPtr<ServiceInfo> serviceInfo_;
     MemoryLimitController memoryLimitController_;
 
     ExecutorServiceProviderPtr ioExecutorProvider_;
@@ -202,8 +242,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     std::unordered_map<std::string, LookupServicePtr> 
redirectedClusterLookupServicePtrs_;
     ConnectionPool pool_;
 
-    uint64_t producerIdGenerator_;
-    uint64_t consumerIdGenerator_;
+    std::atomic_uint64_t producerIdGenerator_;
+    std::atomic_uint64_t consumerIdGenerator_;
     std::shared_ptr<std::atomic<uint64_t>> 
requestIdGenerator_{std::make_shared<std::atomic<uint64_t>>(0)};
 
     SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index c814cf8..6465ff7 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -38,12 +38,13 @@ DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 
-ConnectionPool::ConnectionPool(const ClientConfiguration& conf,
+ConnectionPool::ConnectionPool(const AtomicSharedPtr<ServiceInfo>& serviceInfo,
+                               const ClientConfiguration& conf,
                                const ExecutorServiceProviderPtr& 
executorProvider,
-                               const AuthenticationPtr& authentication, const 
std::string& clientVersion)
-    : clientConfiguration_(conf),
+                               const std::string& clientVersion)
+    : serviceInfo_(serviceInfo),
+      clientConfiguration_(conf),
       executorProvider_(executorProvider),
-      authentication_(authentication),
       clientVersion_(clientVersion),
       randomDistribution_(0, conf.getConnectionsPerBroker() - 1),
       
randomEngine_(std::chrono::high_resolution_clock::now().time_since_epoch().count())
 {}
@@ -54,19 +55,8 @@ bool ConnectionPool::close() {
         return false;
     }
 
-    std::vector<ClientConnectionPtr> connectionsToClose;
-    // ClientConnection::close() will remove the connection from the pool, 
which is not allowed when iterating
-    // over a map, so we store the connections to close in a vector first and 
don't iterate the pool when
-    // closing the connections.
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    connectionsToClose.reserve(pool_.size());
-    for (auto&& kv : pool_) {
-        connectionsToClose.emplace_back(kv.second);
-    }
-    pool_.clear();
-    lock.unlock();
-
-    for (auto&& cnx : connectionsToClose) {
+    for (auto&& kv : releaseConnections()) {
+        auto& cnx = kv.second;
         if (cnx) {
             // Close with a fatal error to not let client retry
             auto& future = cnx->close(ResultAlreadyClosed);
@@ -94,6 +84,12 @@ bool ConnectionPool::close() {
     return true;
 }
 
+void ConnectionPool::closeAllConnectionsForNewCluster() {
+    for (auto&& kv : releaseConnections()) {
+        kv.second->close(ResultDisconnected, true);
+    }
+}
+
 static const std::string getKey(const std::string& logicalAddress, const 
std::string& physicalAddress,
                                 size_t keySuffix) {
     std::stringstream ss;
@@ -134,9 +130,9 @@ Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(const
     // No valid or pending connection found in the pool, creating a new one
     ClientConnectionPtr cnx;
     try {
-        cnx.reset(new ClientConnection(logicalAddress, physicalAddress, 
executorProvider_->get(keySuffix),
-                                       clientConfiguration_, authentication_, 
clientVersion_, *this,
-                                       keySuffix));
+        cnx.reset(new ClientConnection(logicalAddress, physicalAddress, 
*serviceInfo_.load(),
+                                       executorProvider_->get(keySuffix), 
clientConfiguration_,
+                                       clientVersion_, *this, keySuffix));
     } catch (Result result) {
         Promise<Result, ClientConnectionWeakPtr> promise;
         promise.setFailed(result);
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 0e3a6d0..f828ac6 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/ClientConfiguration.h>
 #include <pulsar/Result.h>
+#include <pulsar/ServiceInfo.h>
 #include <pulsar/defines.h>
 
 #include <atomic>
@@ -31,6 +32,7 @@
 #include <string>
 
 #include "Future.h"
+#include "lib/AtomicSharedPtr.h"
 namespace pulsar {
 
 class ClientConnection;
@@ -41,8 +43,8 @@ using ExecutorServiceProviderPtr = 
std::shared_ptr<ExecutorServiceProvider>;
 
 class PULSAR_PUBLIC ConnectionPool {
    public:
-    ConnectionPool(const ClientConfiguration& conf, const 
ExecutorServiceProviderPtr& executorProvider,
-                   const AuthenticationPtr& authentication, const std::string& 
clientVersion);
+    ConnectionPool(const AtomicSharedPtr<ServiceInfo>& serviceInfo, const 
ClientConfiguration& conf,
+                   const ExecutorServiceProviderPtr& executorProvider, const 
std::string& clientVersion);
 
     /**
      * Close the connection pool.
@@ -51,6 +53,12 @@ class PULSAR_PUBLIC ConnectionPool {
      */
     bool close();
 
+    /**
+     * Close all existing connections and notify the connection that a new 
cluster will be used.
+     * Unlike close(), the pool remains open for new connections.
+     */
+    void closeAllConnectionsForNewCluster();
+
     void remove(const std::string& logicalAddress, const std::string& 
physicalAddress, size_t keySuffix,
                 ClientConnection* value);
 
@@ -90,9 +98,9 @@ class PULSAR_PUBLIC ConnectionPool {
     size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }
 
    private:
+    const AtomicSharedPtr<ServiceInfo>& serviceInfo_;
     ClientConfiguration clientConfiguration_;
     ExecutorServiceProviderPtr executorProvider_;
-    AuthenticationPtr authentication_;
     typedef std::map<std::string, std::shared_ptr<ClientConnection>> PoolMap;
     PoolMap pool_;
     const std::string clientVersion_;
@@ -102,6 +110,13 @@ class PULSAR_PUBLIC ConnectionPool {
     std::uniform_int_distribution<> randomDistribution_;
     std::mt19937 randomEngine_;
 
+    auto releaseConnections() {
+        decltype(pool_) pool;
+        std::lock_guard lock{mutex_};
+        pool.swap(pool_);
+        return pool;
+    }
+
     friend class PulsarFriend;
 };
 }  // namespace pulsar
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 757b6e8..026f146 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, 
const std::string& topic
       negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, 
*this, conf)),
       ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
       readCompacted_(conf.isReadCompacted()),
-      startMessageId_(pulsar::getStartMessageId(startMessageId, 
conf.isStartMessageIdInclusive())),
+      startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId, 
conf.isStartMessageIdInclusive())),
+      startMessageId_(startMessageIdFromConfig_),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
       
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
       
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -1134,6 +1135,20 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
     }
 }
 
+void ConsumerImpl::onClusterSwitching() {
+    {
+        LockGuard lock{mutex_};
+        incomingMessages_.clear();
+        startMessageId_ = startMessageIdFromConfig_;
+        lastDequedMessageId_ = MessageId::earliest();
+        lastMessageIdInBroker_ = MessageId::earliest();
+        seekStatus_ = SeekStatus::NOT_STARTED;
+        lastSeekArg_.reset();
+    }
+    setRedirectedClusterURI("");
+    ackGroupingTrackerPtr_->flushAndClean();
+}
+
 /**
  * Clear the internal receiver queue and returns the message id of what was 
the 1st message in the queue that
  * was
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 0da82a2..6f287aa 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase {
     void doImmediateAck(const MessageId& msgId, const ResultCallback& 
callback, CommandAck_AckType ackType);
     void doImmediateAck(const std::set<MessageId>& msgIds, const 
ResultCallback& callback);
 
+    void onClusterSwitching();
+
    protected:
     // overrided methods from HandlerBase
     Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) 
override;
@@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase {
 
     MessageId lastDequedMessageId_{MessageId::earliest()};
     MessageId lastMessageIdInBroker_{MessageId::earliest()};
+
+    // When the consumer switches to a new cluster, we should reset 
`startMessageId_` to the original value,
+    // otherwise, the message id of the old cluster might be passed in the 
Subscribe request on the new
+    // cluster.
+    const optional<MessageId> startMessageIdFromConfig_;
     optional<MessageId> startMessageId_;
 
     SeekStatus seekStatus_{SeekStatus::NOT_STARTED};
diff --git a/lib/DefaultServiceInfoProvider.h b/lib/DefaultServiceInfoProvider.h
new file mode 100644
index 0000000..6479bf9
--- /dev/null
+++ b/lib/DefaultServiceInfoProvider.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/ServiceInfoProvider.h>
+
+#include <string>
+
+#include "ClientConfigurationImpl.h"
+
+namespace pulsar {
+
+class DefaultServiceInfoProvider : public ServiceInfoProvider {
+   public:
+    DefaultServiceInfoProvider(const std::string& serviceUrl, const 
ClientConfigurationImpl& config)
+        : serviceInfo_(config.toServiceInfo(serviceUrl)) {}
+
+    ServiceInfo initialServiceInfo() override { return 
std::move(serviceInfo_); }
+
+    void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) 
override {}
+
+   private:
+    ServiceInfo serviceInfo_;
+};
+
+}  // namespace pulsar
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 79d9e94..0be9713 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -47,18 +47,17 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
 const static std::string PARTITION_METHOD_NAME = "partitions";
 const static int NUMBER_OF_LOOKUP_THREADS = 1;
 
-HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
-                                     const ClientConfiguration 
&clientConfiguration,
-                                     const AuthenticationPtr &authData)
+HTTPLookupService::HTTPLookupService(const ServiceInfo &serviceInfo,
+                                     const ClientConfiguration 
&clientConfiguration)
     : 
executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
-      serviceNameResolver_(serviceUrl),
-      authenticationPtr_(authData),
+      serviceNameResolver_(serviceInfo.serviceUrl()),
+      authenticationPtr_(serviceInfo.authentication()),
       
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
       maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),
       tlsPrivateFilePath_(clientConfiguration.getTlsPrivateKeyFilePath()),
       tlsCertificateFilePath_(clientConfiguration.getTlsCertificateFilePath()),
-      tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
-      isUseTls_(clientConfiguration.isUseTls()),
+      tlsTrustCertsFilePath_(serviceInfo.tlsTrustCertsFilePath().value_or("")),
+      isUseTls_(serviceInfo.useTls()),
       tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
       tlsValidateHostname_(clientConfiguration.isValidateHostName()) {}
 
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index d17edd5..61a0615 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -19,6 +19,8 @@
 #ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H
 #define PULSAR_CPP_HTTPLOOKUPSERVICE_H
 
+#include <pulsar/ServiceInfo.h>
+
 #include <cstdint>
 
 #include "ClientImpl.h"
@@ -67,7 +69,7 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
     Result sendHTTPRequest(const std::string& completeUrl, std::string& 
responseData, long& responseCode);
 
    public:
-    HTTPLookupService(const std::string&, const ClientConfiguration&, const 
AuthenticationPtr&);
+    HTTPLookupService(const ServiceInfo& serviceInfo, const 
ClientConfiguration& config);
 
     LookupResultFuture getBroker(const TopicName& topicName) override;
 
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 0799eb6..a569978 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -44,20 +44,19 @@ using std::chrono::seconds;
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, 
const TopicNamePtr& topicName,
                                                  int numPartitions, const 
std::string& subscriptionName,
                                                  const ConsumerConfiguration& 
conf,
-                                                 const LookupServicePtr& 
lookupServicePtr,
                                                  const 
ConsumerInterceptorsPtr& interceptors,
                                                  Commands::SubscriptionMode 
subscriptionMode,
                                                  const optional<MessageId>& 
startMessageId)
     : MultiTopicsConsumerImpl(client, {topicName->toString()}, 
subscriptionName, topicName, conf,
-                              lookupServicePtr, interceptors, 
subscriptionMode, startMessageId) {
+                              interceptors, subscriptionMode, startMessageId) {
     topicsPartitions_[topicName->toString()] = numPartitions;
 }
 
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
     const ClientImplPtr& client, const std::vector<std::string>& topics, const 
std::string& subscriptionName,
     const TopicNamePtr& topicName, const ConsumerConfiguration& conf,
-    const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& 
interceptors,
-    Commands::SubscriptionMode subscriptionMode, const optional<MessageId>& 
startMessageId)
+    const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode 
subscriptionMode,
+    const optional<MessageId>& startMessageId)
     : ConsumerImplBase(client, topicName ? topicName->toString() : 
"EmptyTopics",
                        Backoff(milliseconds(100), seconds(60), 
milliseconds(0)), conf,
                        client->getListenerExecutorProvider()->get()),
@@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
       conf_(conf),
       incomingMessages_(conf.getReceiverQueueSize()),
       messageListener_(conf.getMessageListener()),
-      lookupServicePtr_(lookupServicePtr),
       numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
       topics_(topics),
       subscriptionMode_(subscriptionMode),
@@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
     if (partitionsUpdateInterval > 0) {
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
         partitionsUpdateInterval_ = seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client->getLookup();
     }
 
     state_ = Pending;
@@ -185,7 +182,12 @@ Future<Result, Consumer> 
MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
     auto entry = topicsPartitions_.find(topic);
     if (entry == topicsPartitions_.end()) {
         lock.unlock();
-        lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+        auto client = client_.lock();
+        if (!client) {
+            topicPromise->setFailed(ResultAlreadyClosed);
+            return topicPromise->getFuture();
+        }
+        client->getPartitionMetadataAsync(topicName).addListener(
             [this, topicName, topicPromise](Result result, const 
LookupDataResultPtr& lookupDataResult) {
                 if (result != ResultOk) {
                     LOG_ERROR("Error Checking/Getting Partition Metadata while 
MultiTopics Subscribing- "
@@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
         auto topicName = TopicName::get(item.first);
         auto currentNumPartitions = item.second;
         auto weakSelf = weak_from_this();
-        lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+        auto client = client_.lock();
+        if (!client) {
+            return;
+        }
+        client->getPartitionMetadataAsync(topicName).addListener(
             [this, weakSelf, topicName, currentNumPartitions](Result result,
                                                               const 
LookupDataResultPtr& lookupDataResult) {
                 auto self = weakSelf.lock();
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index dc62865..38a44cd 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl;
 using MultiTopicsBrokerConsumerStatsPtr = 
std::shared_ptr<MultiTopicsBrokerConsumerStatsImpl>;
 class UnAckedMessageTrackerInterface;
 using UnAckedMessageTrackerPtr = 
std::shared_ptr<UnAckedMessageTrackerInterface>;
-class LookupService;
-using LookupServicePtr = std::shared_ptr<LookupService>;
 
 class MultiTopicsConsumerImpl;
 class MultiTopicsConsumerImpl : public ConsumerImplBase {
    public:
     MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr& 
topicName, int numPartitions,
                             const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
-                            const LookupServicePtr& lookupServicePtr,
                             const ConsumerInterceptorsPtr& interceptors,
                             Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
                             const optional<MessageId>& startMessageId = 
optional<MessageId>{});
 
     MultiTopicsConsumerImpl(const ClientImplPtr& client, const 
std::vector<std::string>& topics,
                             const std::string& subscriptionName, const 
TopicNamePtr& topicName,
-                            const ConsumerConfiguration& conf, const 
LookupServicePtr& lookupServicePtr_,
-                            const ConsumerInterceptorsPtr& interceptors,
+                            const ConsumerConfiguration& conf, const 
ConsumerInterceptorsPtr& interceptors,
                             Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
                             const optional<MessageId>& startMessageId = 
optional<MessageId>{});
 
@@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     MessageListener messageListener_;
     DeadlineTimerPtr partitionsUpdateTimer_;
     TimeDuration partitionsUpdateInterval_;
-    LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
     std::atomic<Result> failedResult{ResultOk};
     Promise<Result, ConsumerImplBaseWeakPtr> 
multiTopicsConsumerCreatedPromise_;
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 4a92366..1aa5c87 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -23,7 +23,6 @@
 #include "ClientImpl.h"
 #include "ExecutorService.h"
 #include "LogUtils.h"
-#include "LookupService.h"
 #include "ProducerImpl.h"
 #include "RoundRobinMessageRouter.h"
 #include "SinglePartitionMessageRouter.h"
@@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const 
ClientImplPtr& client, co
         listenerExecutor_ = client->getListenerExecutorProvider()->get();
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
         partitionsUpdateInterval_ = 
std::chrono::seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client->getLookup();
     }
 }
 
@@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() {
 void PartitionedProducerImpl::getPartitionMetadata() {
     using namespace std::placeholders;
     auto weakSelf = weak_from_this();
-    lookupServicePtr_->getPartitionMetadataAsync(topicName_)
+    auto client = client_.lock();
+    if (!client) {
+        return;
+    }
+    client->getPartitionMetadataAsync(topicName_)
         .addListener([weakSelf](Result result, const LookupDataResultPtr& 
lookupDataResult) {
             auto self = weakSelf.lock();
             if (self) {
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 40f2d34..94ba717 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr<ClientImpl>;
 using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-class LookupService;
-using LookupServicePtr = std::shared_ptr<LookupService>;
 class ProducerImpl;
 using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
 class TopicName;
@@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
     ExecutorServicePtr listenerExecutor_;
     DeadlineTimerPtr partitionsUpdateTimer_;
     TimeDuration partitionsUpdateInterval_;
-    LookupServicePtr lookupServicePtr_;
 
     ProducerInterceptorsPtr interceptors_;
 
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc 
b/lib/PatternMultiTopicsConsumerImpl.cc
index fd48fee..4b5aab7 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -21,7 +21,6 @@
 #include "ClientImpl.h"
 #include "ExecutorService.h"
 #include "LogUtils.h"
-#include "LookupService.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -32,10 +31,8 @@ using std::chrono::seconds;
 PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
     const ClientImplPtr& client, const std::string& pattern, 
CommandGetTopicsOfNamespace_Mode getTopicsMode,
     const std::vector<std::string>& topics, const std::string& 
subscriptionName,
-    const ConsumerConfiguration& conf, const LookupServicePtr& 
lookupServicePtr_,
-    const ConsumerInterceptorsPtr& interceptors)
-    : MultiTopicsConsumerImpl(client, topics, subscriptionName, 
TopicName::get(pattern), conf,
-                              lookupServicePtr_, interceptors),
+    const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr& 
interceptors)
+    : MultiTopicsConsumerImpl(client, topics, subscriptionName, 
TopicName::get(pattern), conf, interceptors),
       patternString_(pattern),
       
pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))),
       getTopicsMode_(getTopicsMode),
@@ -84,7 +81,11 @@ void 
PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er
     // already get namespace from pattern.
     assert(namespaceName_);
 
-    lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, 
getTopicsMode_)
+    auto client = client_.lock();
+    if (!client) {
+        return;
+    }
+    client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_)
         
.addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
 this,
                                std::placeholders::_1, std::placeholders::_2));
 }
diff --git a/lib/PatternMultiTopicsConsumerImpl.h 
b/lib/PatternMultiTopicsConsumerImpl.h
index 6352796..796abcc 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
                                    CommandGetTopicsOfNamespace_Mode 
getTopicsMode,
                                    const std::vector<std::string>& topics,
                                    const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
-                                   const LookupServicePtr& lookupServicePtr_,
                                    const ConsumerInterceptorsPtr& 
interceptors);
     ~PatternMultiTopicsConsumerImpl() override;
 
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index 7fa7e8b..754137c 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId,
     if (partitions_ > 0) {
         auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
             client_.lock(), TopicName::get(topic_), partitions_, subscription, 
consumerConf,
-            client_.lock()->getLookup(),
             
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
             Commands::SubscriptionModeNonDurable, startMessageId);
         consumer_ = consumerImpl;
diff --git a/lib/ServiceInfo.cc b/lib/ServiceInfo.cc
new file mode 100644
index 0000000..642b39a
--- /dev/null
+++ b/lib/ServiceInfo.cc
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/ServiceInfo.h>
+
+#include <utility>
+
+#include "ServiceNameResolver.h"
+#include "ServiceURI.h"
+
+namespace pulsar {
+
+ServiceInfo::ServiceInfo(std::string serviceUrl, AuthenticationPtr 
authentication,
+                         std::optional<std::string> tlsTrustCertsFilePath)
+    : serviceUrl_(std::move(serviceUrl)),
+      useTls_(ServiceNameResolver::useTls(ServiceURI(serviceUrl_))),
+      authentication_(std::move(authentication)),
+      tlsTrustCertsFilePath_(std::move(tlsTrustCertsFilePath)) {}
+
+}  // namespace pulsar
diff --git a/lib/c/c_ClientConfiguration.cc b/lib/c/c_ClientConfiguration.cc
index 483c74e..6b11a2a 100644
--- a/lib/c/c_ClientConfiguration.cc
+++ b/lib/c/c_ClientConfiguration.cc
@@ -115,14 +115,6 @@ void 
pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *con
     conf->conf.setLogger(new PulsarCLoggerFactory(logger));
 }
 
-void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t 
*conf, int useTls) {
-    conf->conf.setUseTls(useTls);
-}
-
-int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t 
*conf) {
-    return conf->conf.isUseTls();
-}
-
 void 
pulsar_client_configuration_set_validate_hostname(pulsar_client_configuration_t 
*conf,
                                                        int validateHostName) {
     conf->conf.setValidateHostName(validateHostName);
@@ -155,10 +147,6 @@ void 
pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_con
     conf->conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
 }
 
-const char 
*pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t
 *conf) {
-    return conf->conf.getTlsTrustCertsFilePath().c_str();
-}
-
 void 
pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t
 *conf,
                                                                    int 
allowInsecure) {
     conf->conf.setTlsAllowInsecureConnection(allowInsecure);
diff --git a/perf/PerfConsumer.cc b/perf/PerfConsumer.cc
index 7a707a1..88c4f5d 100644
--- a/perf/PerfConsumer.cc
+++ b/perf/PerfConsumer.cc
@@ -57,7 +57,6 @@ static int64_t currentTimeMillis() {
 struct Arguments {
     std::string authParams;
     std::string authPlugin;
-    bool isUseTls;
     bool isTlsAllowInsecureConnection;
     std::string tlsTrustCertsFilePath;
     std::string topic;
@@ -155,7 +154,6 @@ void handleSubscribe(Result result, Consumer consumer, 
Latch latch) {
 void startPerfConsumer(const Arguments& args) {
     ClientConfiguration conf;
 
-    conf.setUseTls(args.isUseTls);
     conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
     if (!args.tlsTrustCertsFilePath.empty()) {
         std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
@@ -262,9 +260,6 @@ int main(int argc, char** argv) {
         ("auth-plugin,a", 
po::value<std::string>(&args.authPlugin)->default_value(""),
          "Authentication plugin class library path")  //
 
-        ("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
-         "Whether tls connection is used")  //
-
         ("allow-insecure,d", 
po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
          "Whether insecure tls connection is allowed")  //
 
diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc
index 17e70cd..784c651 100644
--- a/perf/PerfProducer.cc
+++ b/perf/PerfProducer.cc
@@ -47,7 +47,6 @@ typedef std::shared_ptr<pulsar::RateLimiter> RateLimiterPtr;
 struct Arguments {
     std::string authParams;
     std::string authPlugin;
-    bool isUseTls;
     bool isTlsAllowInsecureConnection;
     std::string tlsTrustCertsFilePath;
     std::string topic;
@@ -223,9 +222,6 @@ int main(int argc, char** argv) {
         ("auth-plugin,a", 
po::value<std::string>(&args.authPlugin)->default_value(""),
          "Authentication plugin class library path")  //
 
-        ("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
-         "Whether tls connection is used")  //
-
         ("allow-insecure,d", 
po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
          "Whether insecure tls connection is allowed")  //
 
@@ -366,7 +362,6 @@ int main(int argc, char** argv) {
     pulsar::ClientConfiguration conf;
     conf.setConnectionsPerBroker(args.connectionsPerBroker);
     conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024);
-    conf.setUseTls(args.isUseTls);
     conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
     if (!args.tlsTrustCertsFilePath.empty()) {
         std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index 84e8572..4bfd808 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -42,7 +42,7 @@ static const std::string serviceUrlHttp = 
"http://localhost:8080";;
 
 static const std::string tokenPath = TOKEN_PATH;
 
-static std::string getToken() {
+std::string getToken() {
     std::ifstream file(tokenPath);
     std::string str((std::istreambuf_iterator<char>(file)), 
std::istreambuf_iterator<char>());
     return str;
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index d3c6e61..df2c161 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -700,7 +700,8 @@ TEST(BasicEndToEndTest, testConfigurationFile) {
 
     ClientConfiguration config2 = config1;
     AuthenticationDataPtr authData;
-    ASSERT_EQ(ResultOk, config1.getAuth().getAuthData(authData));
+    Client client(lookupUrl, config1);
+    ASSERT_EQ(ResultOk, 
client.getServiceInfo().authentication()->getAuthData(authData));
     ASSERT_EQ(100, config2.getOperationTimeoutSeconds());
     ASSERT_EQ(10, config2.getIOThreads());
     ASSERT_EQ(1, config2.getMessageListenerThreads());
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 92aa820..53cb76e 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 #include <pulsar/Authentication.h>
 #include <pulsar/Client.h>
+#include <pulsar/ServiceInfo.h>
 
 #include <algorithm>
 #include <boost/exception/all.hpp>
@@ -30,6 +31,7 @@
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
 #include "PulsarWrapper.h"
+#include "lib/AtomicSharedPtr.h"
 #include "lib/BinaryProtoLookupService.h"
 #include "lib/ClientConnection.h"
 #include "lib/ConnectionPool.h"
@@ -79,11 +81,12 @@ using namespace pulsar;
 
 TEST(LookupServiceTest, basicLookup) {
     ExecutorServiceProviderPtr service = 
std::make_shared<ExecutorServiceProvider>(1);
-    AuthenticationPtr authData = AuthFactory::Disabled();
     std::string url = "pulsar://localhost:6650";
     ClientConfiguration conf;
     ExecutorServiceProviderPtr 
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
-    ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
+    AtomicSharedPtr<ServiceInfo> serviceInfo;
+    serviceInfo.store(std::make_shared<const ServiceInfo>(url));
+    ConnectionPool pool_(serviceInfo, conf, ioExecutorProvider_, "");
     BinaryProtoLookupService lookupService(url, pool_, conf);
 
     TopicNamePtr topicName = TopicName::get("topic");
@@ -146,24 +149,30 @@ static void testMultiAddresses(LookupService& 
lookupService) {
 }
 
 TEST(LookupServiceTest, testMultiAddresses) {
-    ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), 
AuthFactory::Disabled(), "");
+    AtomicSharedPtr<ServiceInfo> serviceInfo;
+    serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+    ConnectionPool pool(serviceInfo, {}, 
std::make_shared<ExecutorServiceProvider>(1), "");
     ClientConfiguration conf;
-    BinaryProtoLookupService 
binaryLookupService("pulsar://localhost,localhost:9999", pool, conf);
+    BinaryProtoLookupService 
binaryLookupService(ServiceInfo{"pulsar://localhost,localhost:9999"}, pool,
+                                                 conf);
     testMultiAddresses(binaryLookupService);
 
     // HTTPLookupService calls shared_from_this() internally, we must create a 
shared pointer to test
     auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
-        "http://localhost,localhost:9999";, ClientConfiguration{}, 
AuthFactory::Disabled());
+        ServiceInfo{"http://localhost,localhost:9999"}, ClientConfiguration{});
     testMultiAddresses(*httpLookupServicePtr);
 }
 TEST(LookupServiceTest, testRetry) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
-    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+    AtomicSharedPtr<ServiceInfo> serviceInfo;
+    serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+    ConnectionPool pool(serviceInfo, {}, executorProvider, "");
     ClientConfiguration conf;
 
-    auto lookupService = RetryableLookupService::create(
-        
std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9999,localhost", 
pool, conf),
-        std::chrono::seconds(30), executorProvider);
+    auto lookupService =
+        
RetryableLookupService::create(std::make_shared<BinaryProtoLookupService>(
+                                           
ServiceInfo{"pulsar://localhost:9999,localhost"}, pool, conf),
+                                       std::chrono::seconds(30), 
executorProvider);
     ServiceNameResolver& serviceNameResolver = 
lookupService->getServiceNameResolver();
 
     PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
@@ -192,13 +201,17 @@ TEST(LookupServiceTest, testRetry) {
 
 TEST(LookupServiceTest, testTimeout) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
-    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+    AtomicSharedPtr<ServiceInfo> serviceInfo;
+    serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+    ConnectionPool pool(serviceInfo, {}, executorProvider, "");
     ClientConfiguration conf;
 
     constexpr int timeoutInSeconds = 2;
     auto lookupService = RetryableLookupService::create(
-        
std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9990,localhost:9902,localhost:9904",
-                                                   pool, conf),
+        std::make_shared<BinaryProtoLookupService>(
+            
ServiceInfo{"pulsar://localhost:9990,localhost:9902,localhost:9904", 
AuthFactory::Disabled(),
+                        std::nullopt},
+            pool, conf),
         std::chrono::seconds(timeoutInSeconds), executorProvider);
     auto topicNamePtr = TopicName::get("lookup-service-test-retry");
 
@@ -259,7 +272,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
     ASSERT_EQ(ResultOk, result);
 
     // 2. verify getTopicsOfNamespace by regex mode.
-    auto lookupServicePtr = 
PulsarFriend::getClientImplPtr(client_)->getLookup();
+    auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_);
     auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode,
                                const std::set<std::string>& expectedTopics) {
         Future<Result, NamespaceTopicsPtr> getTopicsFuture =
@@ -292,11 +305,8 @@ TEST_P(LookupServiceTest, testGetSchema) {
     Producer producer;
     ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, 
producer));
 
-    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
-    auto lookup = clientImplPtr->getLookup();
-
     SchemaInfo schemaInfo;
-    auto future = lookup->getSchema(TopicName::get(topic));
+    auto future = 
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
     ASSERT_EQ(ResultOk, future.get(schemaInfo));
     ASSERT_EQ(jsonSchema, schemaInfo.getSchema());
     ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType());
@@ -310,11 +320,8 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) {
     Producer producer;
     ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
 
-    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
-    auto lookup = clientImplPtr->getLookup();
-
     SchemaInfo schemaInfo;
-    auto future = lookup->getSchema(TopicName::get(topic));
+    auto future = 
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
     ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo));
 }
 
@@ -335,11 +342,8 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) {
     Producer producer;
     ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, 
producer));
 
-    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
-    auto lookup = clientImplPtr->getLookup();
-
     SchemaInfo schemaInfo;
-    auto future = lookup->getSchema(TopicName::get(topic));
+    auto future = 
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
     ASSERT_EQ(ResultOk, future.get(schemaInfo));
     ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema());
     ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());
@@ -464,9 +468,9 @@ INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, 
::testing::Values(binaryLook
 
 class BinaryProtoLookupServiceRedirectTestHelper : public 
BinaryProtoLookupService {
    public:
-    BinaryProtoLookupServiceRedirectTestHelper(const std::string& serviceUrl, 
ConnectionPool& pool,
+    BinaryProtoLookupServiceRedirectTestHelper(const ServiceInfo& serviceInfo, 
ConnectionPool& pool,
                                                const ClientConfiguration& 
clientConfiguration)
-        : BinaryProtoLookupService(serviceUrl, pool, clientConfiguration) {}
+        : BinaryProtoLookupService(serviceInfo, pool, clientConfiguration) {}
 
     LookupResultFuture findBroker(const std::string& address, bool 
authoritative, const std::string& topic,
                                   size_t redirectCount) {
@@ -476,13 +480,14 @@ class BinaryProtoLookupServiceRedirectTestHelper : public 
BinaryProtoLookupServi
 
 TEST(LookupServiceTest, testRedirectionLimit) {
     const auto redirect_limit = 5;
-    AuthenticationPtr authData = AuthFactory::Disabled();
     ClientConfiguration conf;
     conf.setMaxLookupRedirects(redirect_limit);
     ExecutorServiceProviderPtr 
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
-    ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
-    string url = "pulsar://localhost:6650";
-    BinaryProtoLookupServiceRedirectTestHelper lookupService(url, pool_, conf);
+    AtomicSharedPtr<ServiceInfo> serviceInfo;
+    serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+    ConnectionPool pool_(serviceInfo, conf, ioExecutorProvider_, "");
+    const ServiceInfo lookupServiceInfo{"pulsar://localhost:6650"};
+    BinaryProtoLookupServiceRedirectTestHelper 
lookupService(lookupServiceInfo, pool_, conf);
 
     const auto topicNamePtr = TopicName::get("topic");
     for (auto idx = 0; idx < redirect_limit + 5; ++idx) {
@@ -493,8 +498,8 @@ TEST(LookupServiceTest, testRedirectionLimit) {
 
         if (idx <= redirect_limit) {
             ASSERT_EQ(ResultOk, result);
-            ASSERT_EQ(url, lookupResult.logicalAddress);
-            ASSERT_EQ(url, lookupResult.physicalAddress);
+            ASSERT_EQ(lookupServiceInfo.serviceUrl(), 
lookupResult.logicalAddress);
+            ASSERT_EQ(lookupServiceInfo.serviceUrl(), 
lookupResult.physicalAddress);
         } else {
             ASSERT_EQ(ResultTooManyLookupRequestException, result);
         }
@@ -522,12 +527,12 @@ class MockLookupService : public BinaryProtoLookupService 
{
 };
 
 TEST(LookupServiceTest, testAfterClientShutdown) {
-    auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650", 
ClientConfiguration{},
-                                               [](const std::string& 
serviceUrl, const ClientConfiguration&,
-                                                  ConnectionPool& pool, const 
AuthenticationPtr&) {
-                                                   return 
std::make_shared<MockLookupService>(
-                                                       serviceUrl, pool, 
ClientConfiguration{});
-                                               });
+    auto client = std::make_shared<ClientImpl>(
+        "pulsar://localhost:6650", ClientConfiguration{},
+        [](const ServiceInfo& serviceInfo, const ClientConfiguration&, 
ConnectionPool& pool) {
+            return std::make_shared<MockLookupService>(serviceInfo, pool, 
ClientConfiguration{});
+        });
+
     std::promise<Result> promise;
     client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", 
ConsumerConfiguration{},
                            [&promise](Result result, const Consumer&) { 
promise.set_value(result); });
@@ -545,10 +550,12 @@ TEST(LookupServiceTest, testAfterClientShutdown) {
 
 TEST(LookupServiceTest, testRetryAfterDestroyed) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
-    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+    AtomicSharedPtr<ServiceInfo> serviceInfo;
+    serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+    ConnectionPool pool(serviceInfo, {}, executorProvider, "");
 
-    auto internalLookupService =
-        std::make_shared<MockLookupService>("pulsar://localhost:6650", pool, 
ClientConfiguration{});
+    auto internalLookupService = 
std::make_shared<MockLookupService>(ServiceInfo{"pulsar://localhost:6650"},
+                                                                     pool, 
ClientConfiguration{});
     auto lookupService =
         RetryableLookupService::create(internalLookupService, 
std::chrono::seconds(30), executorProvider);
 
diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc
new file mode 100644
index 0000000..82f5f6f
--- /dev/null
+++ b/tests/ServiceInfoProviderTest.cc
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <thread>
+
+#include "PulsarFriend.h"
+#include "WaitUtils.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+using namespace std::chrono_literals;
+
+class ServiceInfoHolder {
+   public:
+    ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {}
+
+    std::optional<ServiceInfo> getUpdatedValue() {
+        std::lock_guard lock(mutex_);
+        if (!owned_) {
+            return std::nullopt;
+        }
+        owned_ = false;
+        return std::move(serviceInfo_);
+    }
+
+    void updateValue(ServiceInfo info) {
+        std::lock_guard lock(mutex_);
+        serviceInfo_ = std::move(info);
+        owned_ = true;
+    }
+
+   private:
+    ServiceInfo serviceInfo_;
+    bool owned_{true};
+
+    mutable std::mutex mutex_;
+};
+
+class TestServiceInfoProvider : public ServiceInfoProvider {
+   public:
+    TestServiceInfoProvider(ServiceInfoHolder &serviceInfo) : 
serviceInfo_(serviceInfo) {}
+
+    ServiceInfo initialServiceInfo() override { return 
serviceInfo_.getUpdatedValue().value(); }
+
+    void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) 
override {
+        thread_ = std::thread([this, onServiceInfoUpdate] {
+            while (running_) {
+                auto updatedValue = serviceInfo_.getUpdatedValue();
+                if (updatedValue) {
+                    onServiceInfoUpdate(std::move(*updatedValue));
+                }
+                // Use a tight wait loop for tests
+                std::this_thread::sleep_for(10ms);
+            }
+        });
+    }
+
+    ~TestServiceInfoProvider() override {
+        running_ = false;
+        if (thread_.joinable()) {
+            thread_.join();
+        }
+    }
+
+   private:
+    std::thread thread_;
+    ServiceInfoHolder &serviceInfo_;
+    std::atomic_bool running_{true};
+    mutable std::mutex mutex_;
+};
+
+TEST(ServiceInfoProviderTest, testSwitchCluster) {
+    extern std::string getToken();  // from tests/AuthTokenTest.cc
+    // Access "private/auth" namespace in cluster 1
+    ServiceInfo info1{"pulsar://localhost:6650", 
AuthToken::createWithToken(getToken())};
+    // Access "private/auth" namespace in cluster 2
+    ServiceInfo info2{"pulsar+ssl://localhost:6653",
+                      AuthTls::create(TEST_CONF_DIR "/client-cert.pem", 
TEST_CONF_DIR "/client-key.pem"),
+                      TEST_CONF_DIR "/hn-verification/cacert.pem"};
+    // Access "public/default" namespace in cluster 1, which doesn't require 
authentication
+    ServiceInfo info3{"pulsar://localhost:6650"};
+
+    ServiceInfoHolder serviceInfo{info1};
+    auto client = 
Client::create(std::make_unique<TestServiceInfoProvider>(serviceInfo), {});
+
+    const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + 
std::to_string(time(nullptr));
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer));
+
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth, 
MessageId::earliest(), {}, reader));
+
+    auto sendAndReceive = [&](const std::string &value) {
+        MessageId msgId;
+        ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(value).build(), msgId));
+        LOG_INFO("Sent " << value << " to " << msgId);
+
+        Message msg;
+        ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+        LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId);
+        ASSERT_EQ(value, msg.getDataAsString());
+    };
+
+    sendAndReceive("msg-0");
+
+    // Switch to cluster 2 (started by 
./build-support/start-mim-test-service-inside-container.sh)
+    ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
+    serviceInfo.updateValue(info2);
+    ASSERT_TRUE(waitUntil(1s, [&] {
+        return PulsarFriend::getConnections(client).empty() && 
client.getServiceInfo() == info2;
+    }));
+
+    // Now the same will access the same topic in cluster 2
+    sendAndReceive("msg-1");
+
+    // Switch back to cluster 1 without any authentication, the previous 
authentication info configured for
+    // cluster 2 will be cleared.
+    ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
+    serviceInfo.updateValue(info3);
+    ASSERT_TRUE(waitUntil(1s, [&] {
+        return PulsarFriend::getConnections(client).empty() && 
client.getServiceInfo() == info3;
+    }));
+
+    const auto topicNoAuth = "testUpdateConnectionInfo-" + 
std::to_string(time(nullptr));
+    producer.close();
+    ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("msg-2").build()));
+
+    client.close();
+
+    // Verify messages sent to cluster 1 and cluster 2 can be consumed 
successfully with correct
+    // authentication info.
+    auto verify = [](Client &client, const std::string &topic, const 
std::string &value) {
+        Reader reader;
+        ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), 
{}, reader));
+        Message msg;
+        ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+        ASSERT_EQ(value, msg.getDataAsString());
+    };
+    Client client1{info1.serviceUrl(), 
ClientConfiguration().setAuth(info1.authentication())};
+    verify(client1, topicRequiredAuth, "msg-0");
+    client1.close();
+
+    Client client2{info2.serviceUrl(), ClientConfiguration()
+                                           .setAuth(info2.authentication())
+                                           
.setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath())};
+    verify(client2, topicRequiredAuth, "msg-1");
+    client2.close();
+
+    Client client3{info3.serviceUrl()};
+    verify(client3, topicNoAuth, "msg-2");
+    client3.close();
+}

Reply via email to