This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 967529b [fix][client-cpp] Fix Reader segfault when
messageListenerThreads=0 (#553)
967529b is described below
commit 967529b79f8fae55902b679b7e2248ff8e8f98c3
Author: zhanglistar <[email protected]>
AuthorDate: Tue Mar 17 19:57:26 2026 +0800
[fix][client-cpp] Fix Reader segfault when messageListenerThreads=0 (#553)
---
lib/ConsumerImpl.cc | 20 ++++++++++++----
lib/ExecutorService.cc | 7 +++++-
tests/ReaderTest.cc | 45 ++++++++++++++++++++++++++++++++++++
tests/RetryableOperationCacheTest.cc | 13 +++++++++++
4 files changed, 80 insertions(+), 5 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index c3b839a..7cb4821 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -622,6 +622,11 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
if (state == Closing || state == Closed) {
return;
}
+ if (!listenerExecutor_) {
+ LOG_ERROR(getName() << " listenerExecutor_ is null, discarding message
to avoid null dereference");
+ increaseAvailablePermits(cnx);
+ return;
+ }
uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by
same consumer.");
@@ -663,8 +668,11 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
return;
}
// Trigger message listener callback in a separate thread
- while (numOfMessageReceived--) {
-
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener,
get_shared_this_ptr()));
+ if (listenerExecutor_) {
+ while (numOfMessageReceived--) {
+ listenerExecutor_->postWork(
+ std::bind(&ConsumerImpl::internalListener,
get_shared_this_ptr()));
+ }
}
}
}
@@ -713,8 +721,12 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
// has pending receive, direct callback.
if (asyncReceivedWaiting) {
-
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
- get_shared_this_ptr(), ResultOk,
msg, callback));
+ if (listenerExecutor_) {
+
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+ get_shared_this_ptr(),
ResultOk, msg, callback));
+ } else {
+ notifyPendingReceivedCallback(ResultOk, msg, callback);
+ }
return;
}
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index eba7486..8bd8972 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -18,6 +18,8 @@
*/
#include "ExecutorService.h"
+#include <algorithm>
+
#include "LogUtils.h"
#include "TimeUtils.h"
DECLARE_LOG_OBJECT()
@@ -128,9 +130,12 @@ void ExecutorService::close(long timeoutMs) {
/////////////////////
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
- : executors_(nthreads), executorIdx_(0), mutex_() {}
+ : executors_(std::max(1, nthreads)), executorIdx_(0), mutex_() {}
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
+ if (executors_.empty()) {
+ return nullptr;
+ }
idx %= executors_.size();
Lock lock(mutex_);
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 77719a1..3462b55 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <pulsar/ClientConfiguration.h>
#include <pulsar/Reader.h>
#include <time.h>
@@ -982,5 +983,49 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
assertStartMessageId(false, secondMsgId);
}
+// Regression test for segfault when Reader is used with
messageListenerThreads=0.
+// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and
+// ConsumerImpl::messageReceived does not dereference null listenerExecutor_.
+TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
+ ClientConfiguration clientConf;
+ clientConf.setMessageListenerThreads(0);
+ Client client(serviceUrl, clientConf);
+
+ const std::string topicName = "testReaderWithZeroMessageListenerThreads-"
+ std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ ReaderConfiguration readerConf;
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(),
readerConf, reader));
+
+ constexpr int numMessages = 5;
+ for (int i = 0; i < numMessages; i++) {
+ Message msg = MessageBuilder().setContent("msg-" +
std::to_string(i)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ int received = 0;
+ for (int i = 0; i < numMessages + 2; i++) {
+ bool hasMessageAvailable = false;
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ if (!hasMessageAvailable) {
+ break;
+ }
+ Message msg;
+ Result res = reader.readNext(msg, 3000);
+ ASSERT_EQ(ResultOk, res) << "readNext failed at iteration " << i;
+ std::string content = msg.getDataAsString();
+ EXPECT_EQ("msg-" + std::to_string(received), content);
+ ++received;
+ }
+ EXPECT_EQ(received, numMessages);
+
+ producer.close();
+ reader.close();
+ client.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true,
false));
diff --git a/tests/RetryableOperationCacheTest.cc
b/tests/RetryableOperationCacheTest.cc
index c9b8a1d..2daaf3f 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -22,6 +22,7 @@
#include <chrono>
#include <stdexcept>
+#include "lib/ExecutorService.h"
#include "lib/RetryableOperationCache.h"
namespace pulsar {
@@ -82,6 +83,18 @@ class RetryableOperationCacheTest : public ::testing::Test {
using namespace pulsar;
+// Regression test: ExecutorServiceProvider(0) must not cause undefined
behavior (e.g. idx % 0).
+// After fix, nthreads is clamped to at least 1, so get() returns a valid
executor.
+TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) {
+ ExecutorServiceProviderPtr provider =
std::make_shared<ExecutorServiceProvider>(0);
+ for (int i = 0; i < 3; i++) {
+ ExecutorServicePtr executor = provider->get();
+ ASSERT_NE(executor, nullptr)
+ << "get() must not return null when created with 0 threads
(clamped to 1)";
+ }
+ provider->close();
+}
+
TEST_F(RetryableOperationCacheTest, testRetry) {
auto cache = RetryableOperationCache<int>::create(provider_,
std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {