Copilot commented on code in PR #541:
URL: https://github.com/apache/pulsar-client-cpp/pull/541#discussion_r2889967375
##########
tests/ClientTest.cc:
##########
@@ -506,3 +508,86 @@ TEST(ClientTest, testNoRetry) {
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs <<
" ms";
}
}
+
+TEST(ClientTest, testUpdateServiceInfo) {
+ extern std::string getToken(); // from AuthToken.cc
Review Comment:
The comment says `getToken()` is from "AuthToken.cc", but the symbol is
defined in `tests/AuthTokenTest.cc` (and this test relies on that translation
unit being linked). Updating the comment (and/or moving `getToken()` to a
shared test helper) would reduce confusion.
```suggestion
extern std::string getToken(); // from tests/AuthTokenTest.cc
```
##########
lib/ClientImpl.h:
##########
@@ -139,6 +138,26 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
ConnectionPool& getConnectionPool() noexcept { return pool_; }
uint64_t getLookupCount() { return lookupCount_; }
+ void updateServiceInfo(ServiceInfo&& serviceInfo);
+ ServiceInfo getServiceInfo() const;
+
Review Comment:
`updateServiceInfo()` implies `clientConfiguration_` can now change at
runtime, but `conf()` exposes it as a `const ClientConfiguration&` and many
call sites read it without holding `ClientImpl::mutex_`. That introduces a
potential data race between `updateServiceInfo()` writes (auth/TLS/useTls) and
concurrent reads (timeouts, partitions update interval, etc.). Consider making
configuration access thread-safe (e.g., return a copy/snapshot, guard reads
with the shared_mutex for the full read, or move mutable fields out of
`clientConfiguration_` into an atomically replaceable structure).
##########
lib/ClientImpl.cc:
##########
@@ -854,4 +873,42 @@ std::chrono::nanoseconds
ClientImpl::getOperationTimeout(const ClientConfigurati
return clientConfiguration.impl_->operationTimeout;
}
+void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
+ std::unique_lock lock(mutex_);
+ if (state_ != Open) {
+ LOG_ERROR("Client is not open, cannot update connection info");
+ return;
+ }
+
+ if (serviceInfo.authentication.has_value() && *serviceInfo.authentication)
{
+ clientConfiguration_.setAuth(*serviceInfo.authentication);
+ } else {
+ clientConfiguration_.setAuth(AuthFactory::Disabled());
+ }
+ if (serviceInfo.tlsTrustCertsFilePath.has_value()) {
+
clientConfiguration_.setTlsTrustCertsFilePath(*serviceInfo.tlsTrustCertsFilePath);
+ } else {
+ clientConfiguration_.setTlsTrustCertsFilePath("");
+ }
+
clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceInfo.serviceUrl)));
+ serviceInfo_ = {serviceInfo.serviceUrl,
toOptionalAuthentication(clientConfiguration_.getAuthPtr()),
+ clientConfiguration_.getTlsTrustCertsFilePath().empty()
+ ? std::nullopt
+ :
std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())};
+
+ pool_.resetForClusterSwitching(clientConfiguration_.getAuthPtr(),
clientConfiguration_);
+
+ lookupServicePtr_->close();
+ for (auto&& it : redirectedClusterLookupServicePtrs_) {
+ it.second->close();
+ }
+ redirectedClusterLookupServicePtrs_.clear();
+ lookupServicePtr_ = createLookup(serviceInfo.serviceUrl);
+}
Review Comment:
`updateServiceInfo()` rebuilds the lookup service and clears redirected
lookup caches, but it doesn’t reset cluster-specific state like `useProxy_` and
`lookupCount_`. These fields are set from lookup results on the previous
cluster and can affect how future connections resolve/choose addresses (e.g.,
`getPhysicalAddress()` / `HandlerBase::getConnection`). Consider resetting them
when switching clusters so behavior is derived from the new cluster.
##########
lib/ClientImpl.cc:
##########
@@ -854,4 +873,42 @@ std::chrono::nanoseconds
ClientImpl::getOperationTimeout(const ClientConfigurati
return clientConfiguration.impl_->operationTimeout;
}
+void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
+ std::unique_lock lock(mutex_);
+ if (state_ != Open) {
+ LOG_ERROR("Client is not open, cannot update connection info");
Review Comment:
The log message says "cannot update connection info" but the API/method is
`updateServiceInfo`. Aligning the wording (and potentially including the new
service URL) would make troubleshooting clearer.
```suggestion
LOG_ERROR("Client is not open, cannot update service info for
serviceUrl '%s'",
serviceInfo.serviceUrl.c_str());
```
##########
lib/ConsumerImpl.cc:
##########
@@ -1134,6 +1135,16 @@ void ConsumerImpl::messageProcessed(Message& msg, bool
track) {
}
}
+void ConsumerImpl::onClusterSwitching() {
+ {
+ LockGuard lock{mutex_};
+ incomingMessages_.clear();
+ startMessageId_ = startMessageIdFromConfig_;
+ lastDequedMessageId_ = MessageId::earliest();
Review Comment:
`onClusterSwitching()` resets `startMessageId_` and `lastDequedMessageId_`
but leaves `lastMessageIdInBroker_` unchanged. This value is consulted by
`hasMoreMessages()`/`hasMessageAvailableAsync()`, so after switching clusters
it can cause incorrect availability checks based on the old cluster’s last
message id. Consider resetting `lastMessageIdInBroker_` (and any other
cluster-specific cursor state such as seek-related fields if applicable) during
cluster switching.
```suggestion
lastDequedMessageId_ = MessageId::earliest();
lastMessageIdInBroker_ = MessageId::earliest();
seekStatus_ = SeekStatus::NOT_STARTED;
lastSeekArg_.reset();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]