This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new d3962278 [ISSUE #1159] [C++] Use Pass-by-Value instead of
Pass-by-Reference for attemptId in PushConsumer (#1160)
d3962278 is described below
commit d3962278fca112aeffcaf701549241fb37c7d9bf
Author: lizhimins <[email protected]>
AuthorDate: Fri Dec 26 15:20:47 2025 +0800
[ISSUE #1159] [C++] Use Pass-by-Value instead of Pass-by-Reference for
attemptId in PushConsumer (#1160)
---
cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp | 13 +++++++------
cpp/source/rocketmq/ProcessQueueImpl.cpp | 6 +++---
cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h | 8 ++++----
3 files changed, 14 insertions(+), 13 deletions(-)
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index d1c3ba30..72ee3f8a 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -36,7 +36,7 @@
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQu
}
void AsyncReceiveMessageCallback::onCompletion(
- const std::error_code& ec, std::string& attempt_id, const
ReceiveMessageResult& result) {
+ const std::error_code& ec, const std::string& attempt_id, const
ReceiveMessageResult& result) {
std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock();
if (!process_queue) {
@@ -76,7 +76,7 @@ void AsyncReceiveMessageCallback::onCompletion(
const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME =
"receive-later-task";
-void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string
attempt_id) {
+void AsyncReceiveMessageCallback::checkThrottleThenReceive(const std::string&
attempt_id) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_WARN("Process queue should have been destructed");
@@ -94,7 +94,7 @@ void
AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string attempt_i
}
}
-void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
delay, std::string& attempt_id) {
+void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
delay, const std::string& attempt_id) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
@@ -103,7 +103,7 @@ void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
auto client_instance = process_queue->getClientManager();
std::weak_ptr<AsyncReceiveMessageCallback>
receive_callback_weak_ptr(shared_from_this());
- auto task = [receive_callback_weak_ptr, &attempt_id]() {
+ auto task = [receive_callback_weak_ptr, attempt_id]() {
auto async_receive_ptr = receive_callback_weak_ptr.lock();
if (async_receive_ptr) {
async_receive_ptr->checkThrottleThenReceive(attempt_id);
@@ -114,7 +114,7 @@ void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
}
-void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string&
attempt_id) {
+void AsyncReceiveMessageCallback::receiveMessageImmediately(const std::string&
attempt_id) {
auto process_queue_shared_ptr = process_queue_.lock();
if (!process_queue_shared_ptr) {
SPDLOG_INFO("ProcessQueue has been released. Ignore further receive
message request-response cycles");
@@ -128,8 +128,9 @@ void
AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& attempt
return;
}
+ std::string attempt_id_copy = attempt_id;
impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
- process_queue_shared_ptr->getFilterExpression(),
attempt_id);
+ process_queue_shared_ptr->getFilterExpression(),
attempt_id_copy);
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp
b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 59df8578..dc6f32d1 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -41,11 +41,11 @@ ProcessQueueImpl::ProcessQueueImpl(rmq::MessageQueue
message_queue, FilterExpres
invisible_time_(MixAll::millisecondsOf(MixAll::DEFAULT_INVISIBLE_TIME_)),
simple_name_(simpleNameOf(message_queue_)),
consumer_(std::move(consumer)),
client_manager_(std::move(client_instance)),
cached_message_quantity_(0), cached_message_memory_(0) {
- SPDLOG_DEBUG("Created ProcessQueue={}", simpleName());
+ SPDLOG_DEBUG("Created ProcessQueue={}", simple_name_);
}
ProcessQueueImpl::~ProcessQueueImpl() {
- SPDLOG_INFO("ProcessQueue={} should have been re-balanced away, thus, is
destructed", simpleName());
+ SPDLOG_INFO("ProcessQueue={} should have been re-balanced away, thus, is
destructed", simple_name_);
}
void ProcessQueueImpl::callback(std::shared_ptr<AsyncReceiveMessageCallback>
callback) {
@@ -120,7 +120,7 @@ void ProcessQueueImpl::popMessage(std::string& attempt_id) {
std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
auto callback =
- [cb, &attempt_id](const std::error_code& ec, const ReceiveMessageResult&
result) {
+ [cb, attempt_id](const std::error_code& ec, const ReceiveMessageResult&
result) {
std::shared_ptr<AsyncReceiveMessageCallback> receive_cb = cb.lock();
if (receive_cb) {
receive_cb->onCompletion(ec, attempt_id, result);
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index b19f097b..32a7a475 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -29,11 +29,11 @@ class AsyncReceiveMessageCallback : public
std::enable_shared_from_this<AsyncRec
public:
explicit AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue>
process_queue);
- void onCompletion(const std::error_code& ec, std::string& attempt_id, const
ReceiveMessageResult& result);
+ void onCompletion(const std::error_code& ec, const std::string& attempt_id,
const ReceiveMessageResult& result);
- void receiveMessageLater(std::chrono::milliseconds delay, std::string&
attempt_id);
+ void receiveMessageLater(std::chrono::milliseconds delay, const std::string&
attempt_id);
- void receiveMessageImmediately(std::string& attempt_id);
+ void receiveMessageImmediately(const std::string& attempt_id);
private:
/**
@@ -44,7 +44,7 @@ private:
std::function<void(std::string)> receive_message_later_;
- void checkThrottleThenReceive(std::string attempt_id);
+ void checkThrottleThenReceive(const std::string& attempt_id);
static const char* RECEIVE_LATER_TASK_NAME;
};