This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a03eb92 Fix topic lookup segmentation fault after client is closed
(#521)
a03eb92 is described below
commit a03eb9278bf96cbda42eb9e7dc1c73ee0b65ea3e
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Nov 6 11:49:38 2025 +0800
Fix topic lookup segmentation fault after client is closed (#521)
---
lib/ClientImpl.cc | 36 +++++++++++++--------
lib/ClientImpl.h | 8 +++++
lib/ResultUtils.h | 3 +-
lib/RetryableLookupService.h | 12 +++----
lib/RetryableOperationCache.h | 12 ++++++-
tests/LookupServiceTest.cc | 62 ++++++++++++++++++++++++++++++++++++
tests/RetryableOperationCacheTest.cc | 4 +--
7 files changed, 113 insertions(+), 24 deletions(-)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index aa75128..eec3b34 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -78,7 +78,25 @@ typedef std::unique_lock<std::mutex> Lock;
typedef std::vector<std::string> StringList;
+static LookupServicePtr defaultLookupServiceFactory(const std::string&
serviceUrl,
+ const ClientConfiguration&
clientConfiguration,
+ ConnectionPool& pool,
const AuthenticationPtr& auth) {
+ if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
+ LOG_DEBUG("Using HTTP Lookup");
+ return std::make_shared<HTTPLookupService>(serviceUrl,
std::cref(clientConfiguration),
+ std::cref(auth));
+ } else {
+ LOG_DEBUG("Using Binary Lookup");
+ return std::make_shared<BinaryProtoLookupService>(serviceUrl,
std::ref(pool),
+
std::cref(clientConfiguration));
+ }
+}
+
ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration& clientConfiguration)
+ : ClientImpl(serviceUrl, clientConfiguration,
&defaultLookupServiceFactory) {}
+
+ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration& clientConfiguration,
+ LookupServiceFactory&& lookupServiceFactory)
: mutex_(),
state_(Open),
clientConfiguration_(ClientConfiguration(clientConfiguration)
@@ -95,7 +113,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration&
consumerIdGenerator_(0),
closingError(ResultOk),
useProxy_(false),
- lookupCount_(0L) {
+ lookupCount_(0L),
+ lookupServiceFactory_(std::move(lookupServiceFactory)) {
std::unique_ptr<LoggerFactory> loggerFactory =
clientConfiguration_.impl_->takeLogger();
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -106,19 +125,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl,
const ClientConfiguration&
ClientImpl::~ClientImpl() { shutdown(); }
LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
- LookupServicePtr underlyingLookupServicePtr;
- if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
- LOG_DEBUG("Using HTTP Lookup");
- underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
- serviceUrl, std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
- } else {
- LOG_DEBUG("Using Binary Lookup");
- underlyingLookupServicePtr =
std::make_shared<BinaryProtoLookupService>(
- serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
- }
-
auto lookupServicePtr = RetryableLookupService::create(
- underlyingLookupServicePtr,
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
+ lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_,
clientConfiguration_.getAuthPtr()),
+ clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
return lookupServicePtr;
}
@@ -767,6 +776,7 @@ void ClientImpl::shutdown() {
<< " consumers have been shutdown.");
}
+ lookupServicePtr_->close();
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called
before.
return;
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 000e443..0b4d596 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -54,6 +54,8 @@ using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
class LookupService;
using LookupServicePtr = std::shared_ptr<LookupService>;
+using LookupServiceFactory = std::function<LookupServicePtr(const
std::string&, const ClientConfiguration&,
+ ConnectionPool&
pool, const AuthenticationPtr&)>;
class ProducerImplBase;
using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -70,6 +72,11 @@ std::string generateRandomName();
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration);
+
+ // only for tests
+ ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration,
+ LookupServiceFactory&& lookupServiceFactory);
+
virtual ~ClientImpl();
/**
@@ -205,6 +212,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
std::atomic<Result> closingError;
std::atomic<bool> useProxy_;
std::atomic<uint64_t> lookupCount_;
+ LookupServiceFactory lookupServiceFactory_;
friend class Client;
};
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
index dfba7eb..cf4ff1f 100644
--- a/lib/ResultUtils.h
+++ b/lib/ResultUtils.h
@@ -49,7 +49,8 @@ inline bool isResultRetryable(Result result) {
ResultLookupError,
ResultTooManyLookupRequestException,
ResultProducerBlockedQuotaExceededException,
-
ResultProducerBlockedQuotaExceededError};
+
ResultProducerBlockedQuotaExceededError,
+ ResultAlreadyClosed};
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
}
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 8bc40bf..bbcf4f0 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -18,8 +18,6 @@
*/
#pragma once
-#include <chrono>
-
#include "LookupDataResult.h"
#include "LookupService.h"
#include "NamespaceName.h"
@@ -41,10 +39,10 @@ class RetryableLookupService : public LookupService {
: RetryableLookupService(std::forward<Args>(args)...) {}
void close() override {
- lookupCache_->clear();
- partitionLookupCache_->clear();
- namespaceLookupCache_->clear();
- getSchemaCache_->clear();
+ lookupCache_->close();
+ partitionLookupCache_->close();
+ namespaceLookupCache_->close();
+ getSchemaCache_->close();
}
template <typename... Args>
@@ -89,7 +87,7 @@ class RetryableLookupService : public LookupService {
RetryableLookupService(std::shared_ptr<LookupService> lookupService,
TimeDuration timeout,
ExecutorServiceProviderPtr executorProvider)
- : lookupService_(lookupService),
+ : lookupService_(std::move(lookupService)),
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider,
timeout)),
partitionLookupCache_(
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider,
timeout)),
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index e42460d..f2d390d 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -58,6 +58,11 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
Future<Result, T> run(const std::string& key, std::function<Future<Result,
T>()>&& func) {
std::unique_lock<std::mutex> lock{mutex_};
+ if (closed_) {
+ Promise<Result, T> promise;
+ promise.setFailed(ResultAlreadyClosed);
+ return promise.getFuture();
+ }
auto it = operations_.find(key);
if (it == operations_.end()) {
DeadlineTimerPtr timer;
@@ -92,11 +97,15 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
}
}
- void clear() {
+ void close() {
decltype(operations_) operations;
{
std::lock_guard<std::mutex> lock{mutex_};
+ if (closed_) {
+ return;
+ }
operations.swap(operations_);
+ closed_ = true;
}
// cancel() could trigger the listener to erase the key from
operations, so we should use a swap way
// to release the lock here
@@ -110,6 +119,7 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
const TimeDuration timeout_;
std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>>
operations_;
+ bool closed_{false};
mutable std::mutex mutex_;
DECLARE_LOG_OBJECT()
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index ff3a7e0..92aa820 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -500,3 +500,65 @@ TEST(LookupServiceTest, testRedirectionLimit) {
}
}
}
+
+class MockLookupService : public BinaryProtoLookupService {
+ public:
+ using BinaryProtoLookupService::BinaryProtoLookupService;
+
+ Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override {
+ bool expected = true;
+ if (firstTime_.compare_exchange_strong(expected, false)) {
+ // Trigger the retry
+ LOG_INFO("Fail the lookup for " << topicName->toString() << "
intentionally");
+ Promise<Result, LookupDataResultPtr> promise;
+ promise.setFailed(ResultRetryable);
+ return promise.getFuture();
+ }
+ return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
+ }
+
+ private:
+ std::atomic_bool firstTime_{true};
+};
+
+TEST(LookupServiceTest, testAfterClientShutdown) {
+ auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650",
ClientConfiguration{},
+ [](const std::string&
serviceUrl, const ClientConfiguration&,
+ ConnectionPool& pool, const
AuthenticationPtr&) {
+ return
std::make_shared<MockLookupService>(
+ serviceUrl, pool,
ClientConfiguration{});
+ });
+ std::promise<Result> promise;
+ client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub",
ConsumerConfiguration{},
+ [&promise](Result result, const Consumer&) {
promise.set_value(result); });
+ // When shutdown is called, there is a pending lookup request due to the
1st lookup is failed in
+ // MockLookupService. Verify shutdown will cancel it and return
ResultDisconnected.
+ client->shutdown();
+ EXPECT_EQ(ResultDisconnected, promise.get_future().get());
+
+ // A new subscribeAsync call will fail immediately in the current thread
+ Result result = ResultOk;
+ client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub",
ConsumerConfiguration{},
+ [&result](Result innerResult, const Consumer&) {
result = innerResult; });
+ EXPECT_EQ(ResultAlreadyClosed, result);
+}
+
+TEST(LookupServiceTest, testRetryAfterDestroyed) {
+ auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+ ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+
+ auto internalLookupService =
+ std::make_shared<MockLookupService>("pulsar://localhost:6650", pool,
ClientConfiguration{});
+ auto lookupService =
+ RetryableLookupService::create(internalLookupService,
std::chrono::seconds(30), executorProvider);
+
+ // Simulate the race condition that `getPartitionMetadataAsync` is called
after `close` is called on the
+ // lookup service. It's expected the request fails immediately with
ResultAlreadyClosed.
+ lookupService->close();
+ Result result = ResultOk;
+
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
+ .addListener([&result](Result innerResult, const LookupDataResultPtr&)
{ result = innerResult; });
+ EXPECT_EQ(ResultAlreadyClosed, result);
+ pool.close();
+ executorProvider->close();
+}
diff --git a/tests/RetryableOperationCacheTest.cc
b/tests/RetryableOperationCacheTest.cc
index 2a6948e..c9b8a1d 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -118,13 +118,13 @@ TEST_F(RetryableOperationCacheTest, testTimeout) {
}
}
-TEST_F(RetryableOperationCacheTest, testClear) {
+TEST_F(RetryableOperationCacheTest, testClose) {
auto cache = RetryableOperationCache<int>::create(provider_,
std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {
futures_.emplace_back(cache->run("key-" + std::to_string(i),
CountdownFunc{100}));
}
ASSERT_EQ(getSize(*cache), 10);
- cache->clear();
+ cache->close();
for (auto&& future : futures_) {
int value;
// All cancelled futures complete with ResultDisconnected and the
default int value