Copilot commented on code in PR #541:
URL: https://github.com/apache/pulsar-client-cpp/pull/541#discussion_r2889967375


##########
tests/ClientTest.cc:
##########
@@ -506,3 +508,86 @@ TEST(ClientTest, testNoRetry) {
         ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << 
" ms";
     }
 }
+
+TEST(ClientTest, testUpdateServiceInfo) {
+    extern std::string getToken();  // from AuthToken.cc

Review Comment:
   The comment says `getToken()` is from "AuthToken.cc", but the symbol is 
defined in `tests/AuthTokenTest.cc` (and this test relies on that translation 
unit being linked). Updating the comment (and/or moving `getToken()` to a 
shared test helper) would reduce confusion.
   ```suggestion
       extern std::string getToken();  // from tests/AuthTokenTest.cc
   ```



##########
lib/ClientImpl.h:
##########
@@ -139,6 +138,26 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     ConnectionPool& getConnectionPool() noexcept { return pool_; }
     uint64_t getLookupCount() { return lookupCount_; }
 
+    void updateServiceInfo(ServiceInfo&& serviceInfo);
+    ServiceInfo getServiceInfo() const;
+

Review Comment:
   `updateServiceInfo()` implies `clientConfiguration_` can now change at 
runtime, but `conf()` exposes it as a `const ClientConfiguration&` and many 
call sites read it without holding `ClientImpl::mutex_`. That introduces a 
potential data race between `updateServiceInfo()` writes (auth/TLS/useTls) and 
concurrent reads (timeouts, partitions update interval, etc.). Consider making 
configuration access thread-safe (e.g., return a copy/snapshot, guard reads 
with the shared_mutex for the full read, or move mutable fields out of 
`clientConfiguration_` into an atomically replaceable structure).



##########
lib/ClientImpl.cc:
##########
@@ -854,4 +873,42 @@ std::chrono::nanoseconds 
ClientImpl::getOperationTimeout(const ClientConfigurati
     return clientConfiguration.impl_->operationTimeout;
 }
 
+void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
+    std::unique_lock lock(mutex_);
+    if (state_ != Open) {
+        LOG_ERROR("Client is not open, cannot update connection info");
+        return;
+    }
+
+    if (serviceInfo.authentication.has_value() && *serviceInfo.authentication) 
{
+        clientConfiguration_.setAuth(*serviceInfo.authentication);
+    } else {
+        clientConfiguration_.setAuth(AuthFactory::Disabled());
+    }
+    if (serviceInfo.tlsTrustCertsFilePath.has_value()) {
+        
clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath);
+    } else {
+        clientConfiguration_.setTlsTrustCertsFilePath("");
+    }
+    
clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl)));
+    serviceInfo_ = {serviceInfo.serviceUrl, 
toOptionalAuthentication(clientConfiguration_.getAuthPtr()),
+                    clientConfiguration_.getTlsTrustCertsFilePath().empty()
+                        ? std::nullopt
+                        : 
std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())};
+
+    pool_.resetForClusterSwitching(clientConfiguration_.getAuthPtr(), 
clientConfiguration_);
+
+    lookupServicePtr_->close();
+    for (auto&& it : redirectedClusterLookupServicePtrs_) {
+        it.second->close();
+    }
+    redirectedClusterLookupServicePtrs_.clear();
+    lookupServicePtr_ = createLookup(serviceInfo.serviceUrl);
+}

Review Comment:
   `updateServiceInfo()` rebuilds the lookup service and clears redirected 
lookup caches, but it doesn’t reset cluster-specific state like `useProxy_` and 
`lookupCount_`. These fields are set from lookup results on the previous 
cluster and can affect how future connections resolve/choose addresses (e.g., 
`getPhysicalAddress()` / `HandlerBase::getConnection`). Consider resetting them 
when switching clusters so behavior is derived from the new cluster.



##########
lib/ClientImpl.cc:
##########
@@ -854,4 +873,42 @@ std::chrono::nanoseconds 
ClientImpl::getOperationTimeout(const ClientConfigurati
     return clientConfiguration.impl_->operationTimeout;
 }
 
+void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
+    std::unique_lock lock(mutex_);
+    if (state_ != Open) {
+        LOG_ERROR("Client is not open, cannot update connection info");

Review Comment:
   The log message says "cannot update connection info" but the API/method is 
`updateServiceInfo`. Aligning the wording (and potentially including the new 
service URL) would make troubleshooting clearer.
   ```suggestion
           LOG_ERROR("Client is not open, cannot update service info for 
serviceUrl '%s'",
                     serviceInfo.serviceUrl.c_str());
   ```



##########
lib/ConsumerImpl.cc:
##########
@@ -1134,6 +1135,16 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
     }
 }
 
+void ConsumerImpl::onClusterSwitching() {
+    {
+        LockGuard lock{mutex_};
+        incomingMessages_.clear();
+        startMessageId_ = startMessageIdFromConfig_;
+        lastDequedMessageId_ = MessageId::earliest();

Review Comment:
   `onClusterSwitching()` resets `startMessageId_` and `lastDequedMessageId_` 
but leaves `lastMessageIdInBroker_` unchanged. This value is consulted by 
`hasMoreMessages()`/`hasMessageAvailableAsync()`, so after switching clusters 
it can cause incorrect availability checks based on the old cluster’s last 
message id. Consider resetting `lastMessageIdInBroker_` (and any other 
cluster-specific cursor state such as seek-related fields if applicable) during 
cluster switching.
   ```suggestion
           lastDequedMessageId_ = MessageId::earliest();
           lastMessageIdInBroker_ = MessageId::earliest();
           seekStatus_ = SeekStatus::NOT_STARTED;
           lastSeekArg_.reset();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to