This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a9bb6ffb [ISSUE #1035] [C++] Supports configuring the number of client
threads (#1155)
a9bb6ffb is described below
commit a9bb6ffb3ee85d4d0b9a23a2f22a80d4000cd71c
Author: lizhimins <[email protected]>
AuthorDate: Wed Dec 24 10:04:05 2025 +0800
[ISSUE #1035] [C++] Supports configuring the number of client threads
(#1155)
---
cpp/include/rocketmq/Configuration.h | 7 +++++++
cpp/include/rocketmq/Producer.h | 2 +-
cpp/source/base/Configuration.cpp | 5 +++++
cpp/source/base/ThreadPoolImpl.cpp | 3 +--
cpp/source/client/ClientManagerImpl.cpp | 7 +++----
cpp/source/client/include/ClientConfig.h | 1 +
cpp/source/client/include/ClientManagerImpl.h | 2 +-
cpp/source/rocketmq/ClientImpl.cpp | 9 ++++++++-
cpp/source/rocketmq/Producer.cpp | 5 +++--
cpp/source/rocketmq/PushConsumer.cpp | 3 ++-
cpp/source/rocketmq/SimpleConsumer.cpp | 1 +
cpp/source/rocketmq/include/ClientImpl.h | 4 ++++
cpp/source/scheduler/SchedulerImpl.cpp | 5 +----
13 files changed, 38 insertions(+), 16 deletions(-)
diff --git a/cpp/include/rocketmq/Configuration.h
b/cpp/include/rocketmq/Configuration.h
index a653c87a..b748ae24 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -51,6 +51,10 @@ public:
return tls_;
}
+ std::uint32_t callbackThreads() const {
+ return callback_threads_;
+ }
+
protected:
friend class ConfigurationBuilder;
@@ -62,6 +66,7 @@ private:
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds
request_timeout_{ConfigurationDefaults::RequestTimeout};
bool tls_ = true;
+ std::uint32_t callback_threads_{2};
};
class ConfigurationBuilder {
@@ -76,6 +81,8 @@ public:
ConfigurationBuilder& withSsl(bool with_ssl);
+ ConfigurationBuilder& withCallbackThreads(std::uint32_t callback_threads);
+
Configuration build();
private:
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index be1026f8..e1c093c3 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -94,7 +94,7 @@ class ProducerBuilder {
public:
ProducerBuilder();
- ProducerBuilder& withConfiguration(Configuration configuration);
+ ProducerBuilder& withConfiguration(const Configuration& configuration);
ProducerBuilder& withTopics(const std::vector<std::string>& topics);
diff --git a/cpp/source/base/Configuration.cpp
b/cpp/source/base/Configuration.cpp
index 13330261..a790f536 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -48,6 +48,11 @@ ConfigurationBuilder& ConfigurationBuilder::withSsl(bool
with_ssl) {
return *this;
}
+ConfigurationBuilder& ConfigurationBuilder::withCallbackThreads(std::uint32_t
callback_threads){
+ configuration_.callback_threads_ = callback_threads;
+ return *this;
+}
+
Configuration ConfigurationBuilder::build() {
return std::move(configuration_);
}
diff --git a/cpp/source/base/ThreadPoolImpl.cpp
b/cpp/source/base/ThreadPoolImpl.cpp
index f07ba8d7..f19ee1d9 100644
--- a/cpp/source/base/ThreadPoolImpl.cpp
+++ b/cpp/source/base/ThreadPoolImpl.cpp
@@ -22,8 +22,6 @@
#include "rocketmq/RocketMQ.h"
#include "rocketmq/State.h"
#include "spdlog/spdlog.h"
-#include <atomic>
-#include <cstdint>
#include <exception>
#include <system_error>
@@ -33,6 +31,7 @@ ThreadPoolImpl::ThreadPoolImpl(std::uint16_t workers)
: work_guard_(
absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
workers_(workers) {
+ SPDLOG_INFO("ThreadPoolImpl created worker threads {}", workers);
}
void ThreadPoolImpl::start() {
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index adcd0708..9c6b7a0e 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -32,7 +32,6 @@
#include "ReceiveMessageContext.h"
#include "RpcClient.h"
#include "RpcClientImpl.h"
-#include "Scheduler.h"
#include "SchedulerImpl.h"
#include "UtilAll.h"
#include "google/protobuf/util/time_util.h"
@@ -42,11 +41,11 @@
ROCKETMQ_NAMESPACE_BEGIN
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool
with_ssl)
- : scheduler_(std::make_shared<SchedulerImpl>()),
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool
with_ssl, int thread_count)
+ : scheduler_(std::make_shared<SchedulerImpl>(2)),
resource_namespace_(std::move(resource_namespace)),
state_(State::CREATED),
-
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
+ callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(thread_count)),
with_ssl_(with_ssl) {
certificate_verifier_ =
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
diff --git a/cpp/source/client/include/ClientConfig.h
b/cpp/source/client/include/ClientConfig.h
index 542cd116..dddcc4be 100644
--- a/cpp/source/client/include/ClientConfig.h
+++ b/cpp/source/client/include/ClientConfig.h
@@ -62,6 +62,7 @@ struct ClientConfig {
SubscriberConfig subscriber;
Metric metric;
bool withSsl;
+ std::uint32_t callback_threads{2};
std::unique_ptr<opencensus::trace::Sampler> sampler_;
};
diff --git a/cpp/source/client/include/ClientManagerImpl.h
b/cpp/source/client/include/ClientManagerImpl.h
index c6e60064..08b5afa8 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -48,7 +48,7 @@ public:
* effectively.
* @param resource_namespace Abstract resource namespace, in which this
client manager lives.
*/
- explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl =
true);
+ explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl =
true, int thread_count = 1);
~ClientManagerImpl() override;
diff --git a/cpp/source/rocketmq/ClientImpl.cpp
b/cpp/source/rocketmq/ClientImpl.cpp
index a5cfae8d..19b4e460 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -108,7 +108,9 @@ void ClientImpl::start() {
client_config_.client_id = clientId();
if (!client_manager_) {
client_manager_ = std::make_shared<ClientManagerImpl>(
- client_config_.resource_namespace, client_config_.withSsl);
+ client_config_.resource_namespace,
+ client_config_.withSsl,
+ client_config_.callback_threads);
client_manager_->start();
}
@@ -118,6 +120,11 @@ void ClientImpl::start() {
return;
}
+ // A gRPC I/O thread pool is created upon establishing a connection.
+ // - https://github.com/grpc/grpc/issues/28642
+ // - https://github.com/grpc/grpc/pull/31662
+ // The source code initializes the number of I/O threads as follows:
+ // int num_io_threads = grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
while (true) {
createSession(endpoint, false);
{
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 916c47a4..ebccf60e 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -76,14 +76,15 @@ ProducerBuilder Producer::newBuilder() {
return {};
}
-ProducerBuilder::ProducerBuilder() : impl_(std::make_shared<ProducerImpl>()){};
+ProducerBuilder::ProducerBuilder() : impl_(std::make_shared<ProducerImpl>()){}
-ProducerBuilder& ProducerBuilder::withConfiguration(Configuration
configuration) {
+ProducerBuilder& ProducerBuilder::withConfiguration(const Configuration&
configuration) {
auto name_server_resolver =
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
impl_->withNameServerResolver(std::move(name_server_resolver));
impl_->withResourceNamespace(configuration.resourceNamespace());
impl_->withCredentialsProvider(configuration.credentialsProvider());
impl_->withRequestTimeout(configuration.requestTimeout());
+ impl_->withCallbackThreads(configuration.callbackThreads());
impl_->withSsl(configuration.withSsl());
return *this;
}
diff --git a/cpp/source/rocketmq/PushConsumer.cpp
b/cpp/source/rocketmq/PushConsumer.cpp
index a2f22803..a6dd3d35 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -44,10 +44,11 @@ PushConsumer PushConsumerBuilder::build() {
impl->consumeThreadPoolSize(consume_thread_);
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
impl->withResourceNamespace(configuration_.resourceNamespace());
- impl->withSsl(configuration_.withSsl());
impl->withCredentialsProvider(configuration_.credentialsProvider());
impl->withRequestTimeout(configuration_.requestTimeout());
impl->withFifoConsumeAccelerator(fifo_consume_accelerator_);
+ impl->withCallbackThreads(configuration_.callbackThreads());
+ impl->withSsl(configuration_.withSsl());
impl->start();
return PushConsumer(impl);
}
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp
b/cpp/source/rocketmq/SimpleConsumer.cpp
index f321247f..152c0537 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -133,6 +133,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
simple_consumer.impl_->withResourceNamespace(configuration_.resourceNamespace());
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
+ simple_consumer.impl_->withCallbackThreads(configuration_.callbackThreads());
simple_consumer.impl_->withSsl(configuration_.withSsl());
for (const auto& entry : subscriptions_) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index 96865399..01967384 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -102,6 +102,10 @@ public:
client_config_.withSsl = with_ssl;
}
+ void withCallbackThreads(std::uint32_t callback_threads) {
+ client_config_.callback_threads = callback_threads;
+ }
+
void withFifoConsumeAccelerator(bool fifo_consume_accelerator) {
client_config_.subscriber.fifo_consume_accelerator =
fifo_consume_accelerator;
}
diff --git a/cpp/source/scheduler/SchedulerImpl.cpp
b/cpp/source/scheduler/SchedulerImpl.cpp
index 1c036df2..cb05718b 100644
--- a/cpp/source/scheduler/SchedulerImpl.cpp
+++ b/cpp/source/scheduler/SchedulerImpl.cpp
@@ -16,10 +16,7 @@
*/
#include "SchedulerImpl.h"
-#include <atomic>
#include <cassert>
-#include <cstdint>
-#include <cstdlib>
#include <exception>
#include <functional>
#include <memory>
@@ -27,7 +24,6 @@
#include <thread>
#include "absl/memory/memory.h"
-#include "asio/error_code.hpp"
#include "asio/executor_work_guard.hpp"
#include "asio/io_context.hpp"
#include "asio/steady_timer.hpp"
@@ -39,6 +35,7 @@ SchedulerImpl::SchedulerImpl(std::uint32_t worker_num)
: work_guard_(
absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
worker_num_(worker_num) {
+ SPDLOG_INFO("SchedulerImpl created worker thread {}", worker_num);
}
SchedulerImpl::SchedulerImpl() :
SchedulerImpl(std::thread::hardware_concurrency()) {