This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2606df9 Fix multi-topics consumer will crash if one internal consumer
fails getBrokerConsumerStatsAsync (#538)
2606df9 is described below
commit 2606df96d9e2dbcc90ec8374ea1d4d45c147d128
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Feb 6 19:33:16 2026 +0800
Fix multi-topics consumer will crash if one internal consumer fails
getBrokerConsumerStatsAsync (#538)
---
lib/ClientConnection.cc | 5 ++++
lib/ClientConnection.h | 4 +--
lib/MockServer.h | 17 +++++++----
lib/MultiTopicsConsumerImpl.cc | 63 ++++++++++++++++++++--------------------
lib/MultiTopicsConsumerImpl.h | 4 ---
tests/ConsumerTest.cc | 1 +
tests/MultiTopicsConsumerTest.cc | 30 +++++++++++++++++++
7 files changed, 81 insertions(+), 43 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 4f7a1dd..1d488d8 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -997,9 +997,14 @@ Future<Result, BrokerConsumerStatsImpl>
ClientConnection::newConsumerStats(uint6
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
+ return promise.getFuture();
}
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
lock.unlock();
+ if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
+ mockServer_->sendRequest("CONSUMER_STATS", requestId)) {
+ return promise.getFuture();
+ }
sendCommand(Commands::newConsumerStats(consumerId, requestId));
return promise.getFuture();
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index aae53d2..b277000 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -219,6 +219,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
mockingRequests_.store(true, std::memory_order_release);
}
+ void handleKeepAliveTimeout();
+
private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
@@ -284,8 +286,6 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleGetLastMessageIdTimeout(const ASIO_ERROR&, const
LastMessageIdRequestData& data);
- void handleKeepAliveTimeout();
-
template <typename Handler>
inline AllocHandler<Handler> customAllocReadHandler(Handler h) {
return AllocHandler<Handler>(readHandlerAllocator_, h);
diff --git a/lib/MockServer.h b/lib/MockServer.h
index bd413d3..2d830fc 100644
--- a/lib/MockServer.h
+++ b/lib/MockServer.h
@@ -75,11 +75,18 @@ class MockServer : public
std::enable_shared_from_this<MockServer> {
}
});
}
- schedule(connection, request + std::to_string(requestId),
iter->second, [connection, requestId] {
- proto::CommandSuccess success;
- success.set_request_id(requestId);
- connection->handleSuccess(success);
- });
+ schedule(connection, request + std::to_string(requestId),
iter->second,
+ [connection, request, requestId] {
+ if (request == "CONSUMER_STATS") {
+ proto::CommandConsumerStatsResponse response;
+ response.set_request_id(requestId);
+ connection->handleConsumerStatsResponse(response);
+ } else {
+ proto::CommandSuccess success;
+ success.set_request_id(requestId);
+ connection->handleSuccess(success);
+ }
+ });
return true;
} else {
return false;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 6e0ba86..9c741fa 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -847,48 +847,47 @@ void
MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(const BrokerConsumerSt
Lock lock(mutex_);
MultiTopicsBrokerConsumerStatsPtr statsPtr =
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
- LatchPtr latchPtr =
std::make_shared<Latch>(numberTopicPartitions_->load());
+ auto latchPtr =
std::make_shared<std::atomic_size_t>(numberTopicPartitions_->load());
lock.unlock();
size_t i = 0;
- consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const
ConsumerImplPtr& consumer) {
- size_t index = i++;
- auto weakSelf = weak_from_this();
- consumer->getBrokerConsumerStatsAsync([this, weakSelf, latchPtr,
statsPtr, index, callback](
- Result result, const
BrokerConsumerStats& stats) {
- auto self = weakSelf.lock();
- if (self) {
- handleGetConsumerStats(result, stats, latchPtr, statsPtr,
index, callback);
- }
+ auto failedResult = std::make_shared<std::atomic<Result>>(ResultOk);
+ consumers_.forEachValue(
+ [this, &latchPtr, &statsPtr, &i, callback, &failedResult](const
ConsumerImplPtr& consumer) {
+ size_t index = i++;
+ auto weakSelf = weak_from_this();
+ consumer->getBrokerConsumerStatsAsync(
+ [this, weakSelf, latchPtr, statsPtr, index, callback,
failedResult](
+ Result result, const BrokerConsumerStats& stats) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ if (result == ResultOk) {
+ std::lock_guard<std::mutex> lock{mutex_};
+ statsPtr->add(stats, index);
+ } else {
+ // Store the first failed result as the final failed
result
+ auto expected = ResultOk;
+ failedResult->compare_exchange_strong(expected,
result);
+ }
+ if (--*latchPtr == 0) {
+ if (auto firstFailedResult =
failedResult->load(std::memory_order_acquire);
+ firstFailedResult == ResultOk) {
+ callback(ResultOk, BrokerConsumerStats{statsPtr});
+ } else {
+ // Fail the whole operation if any of the
consumers failed
+ callback(firstFailedResult, {});
+ }
+ }
+ });
});
- });
}
void MultiTopicsConsumerImpl::getLastMessageIdAsync(const
BrokerGetLastMessageIdCallback& callback) {
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
}
-void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res,
- const
BrokerConsumerStats& brokerConsumerStats,
- const LatchPtr& latchPtr,
- const
MultiTopicsBrokerConsumerStatsPtr& statsPtr,
- size_t index,
- const
BrokerConsumerStatsCallback& callback) {
- Lock lock(mutex_);
- if (res == ResultOk) {
- latchPtr->countdown();
- statsPtr->add(brokerConsumerStats, index);
- } else {
- lock.unlock();
- callback(res, BrokerConsumerStats());
- return;
- }
- if (latchPtr->getCount() == 0) {
- lock.unlock();
- callback(ResultOk, BrokerConsumerStats(statsPtr));
- }
-}
-
std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const
std::vector<std::string>& topics) {
TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>();
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index b22227e..dc62865 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -28,7 +28,6 @@
#include "ConsumerImpl.h"
#include "ConsumerInterceptors.h"
#include "Future.h"
-#include "Latch.h"
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
@@ -100,9 +99,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback)
override;
- void handleGetConsumerStats(Result, const BrokerConsumerStats&, const
LatchPtr&,
- const MultiTopicsBrokerConsumerStatsPtr&,
size_t,
- const BrokerConsumerStatsCallback&);
// return first topic name when all topics name valid, or return null
pointer
static std::shared_ptr<TopicName> topicNamesValid(const
std::vector<std::string>& topics);
void unsubscribeOneTopicAsync(const std::string& topic, const
ResultCallback& callback);
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f1bca77..795613e 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -40,6 +40,7 @@
#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/Future.h"
+#include "lib/Latch.h"
#include "lib/LogUtils.h"
#include "lib/MessageIdUtil.h"
#include "lib/MultiTopicsConsumerImpl.h"
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index d59b50d..db3bc96 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -20,9 +20,13 @@
#include <pulsar/Client.h>
#include <chrono>
+#include <future>
+#include <thread>
#include "ThreadSafeMessages.h"
#include "lib/LogUtils.h"
+#include "lib/MockServer.h"
+#include "tests/PulsarFriend.h"
static const std::string lookupUrl = "pulsar://localhost:6650";
@@ -142,3 +146,29 @@ TEST(MultiTopicsConsumerTest,
testAcknowledgeInvalidMessageId) {
client.close();
}
+
+TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
+ Client client{lookupUrl};
+ std::vector<std::string> topics{"testGetConsumerStatsFail0",
"testGetConsumerStatsFail1"};
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));
+
+ auto connection = *PulsarFriend::getConnections(client).begin();
+ auto mockServer = std::make_shared<MockServer>(connection);
+ connection->attachMockServer(mockServer);
+
+ mockServer->setRequestDelay({{"CONSUMER_STATS", 3000}});
+ auto future = std::async(std::launch::async, [&consumer]() {
+ BrokerConsumerStats stats;
+ return consumer.getBrokerConsumerStats(stats);
+ });
+ // Trigger the `getBrokerConsumerStats` in a new thread
+ future.wait_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ connection->handleKeepAliveTimeout();
+ ASSERT_EQ(ResultDisconnected, future.get());
+
+ mockServer->close();
+ client.close();
+}