This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new b727270f [ISSUE #969] C++ client support attemptId in PushConsumer
(#970)
b727270f is described below
commit b727270f75e728b23c63b977cf6d233444c97e61
Author: lizhimins <[email protected]>
AuthorDate: Thu Mar 27 13:44:06 2025 +0800
[ISSUE #969] C++ client support attemptId in PushConsumer (#970)
---
cpp/source/base/Message.cpp | 1 -
cpp/source/base/Protocol.cpp | 3 +-
.../rocketmq/AsyncReceiveMessageCallback.cpp | 40 ++++++++++--------
cpp/source/rocketmq/ProcessQueueImpl.cpp | 48 ++++++++++++++--------
cpp/source/rocketmq/PushConsumerImpl.cpp | 18 ++++----
.../rocketmq/include/AsyncReceiveMessageCallback.h | 10 ++---
cpp/source/rocketmq/include/ProcessQueue.h | 2 +-
cpp/source/rocketmq/include/ProcessQueueImpl.h | 6 +--
cpp/source/rocketmq/include/PushConsumerImpl.h | 5 ++-
9 files changed, 78 insertions(+), 55 deletions(-)
diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp
index cd3cc236..8d82bd3f 100644
--- a/cpp/source/base/Message.cpp
+++ b/cpp/source/base/Message.cpp
@@ -20,7 +20,6 @@
#include <memory>
#include "UniqueIdGenerator.h"
-#include "absl/memory/memory.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/source/base/Protocol.cpp b/cpp/source/base/Protocol.cpp
index 91a643b3..70562a2b 100644
--- a/cpp/source/base/Protocol.cpp
+++ b/cpp/source/base/Protocol.cpp
@@ -72,7 +72,8 @@ bool operator==(const rmq::MessageQueue& lhs, const
rmq::MessageQueue& rhs) {
}
std::string simpleNameOf(const rmq::MessageQueue& m) {
- return fmt::format("{}{}-{}-{}", m.topic().resource_namespace(),
m.topic().name(), m.id(), m.broker().name());
+ return fmt::format("{}@{}@{}@{}",
+ m.topic().resource_namespace(), m.topic().name(), m.id(),
m.broker().name());
}
bool operator==(const std::vector<rmq::MessageQueue>& lhs, const
std::vector<rmq::MessageQueue>& rhs) {
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index f68c9f88..d1c3ba30 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -30,10 +30,14 @@ ROCKETMQ_NAMESPACE_BEGIN
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue>
process_queue)
: process_queue_(std::move(process_queue)) {
- receive_message_later_ =
std::bind(&AsyncReceiveMessageCallback::checkThrottleThenReceive, this);
+
+ receive_message_later_ = std::bind(
+ &AsyncReceiveMessageCallback::checkThrottleThenReceive, this,
std::placeholders::_1);
}
-void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec,
const ReceiveMessageResult& result) {
+void AsyncReceiveMessageCallback::onCompletion(
+ const std::error_code& ec, std::string& attempt_id, const
ReceiveMessageResult& result) {
+
std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_INFO("Process queue has been destructed.");
@@ -47,18 +51,19 @@ void AsyncReceiveMessageCallback::onCompletion(const
std::error_code& ec, const
if (ec == ErrorCode::TooManyRequests) {
SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms.
Queue={}", process_queue->simpleName());
- receiveMessageLater(std::chrono::milliseconds(20));
+ receiveMessageLater(std::chrono::milliseconds(20), attempt_id);
return;
}
if (ec == ErrorCode::NoContent) {
- checkThrottleThenReceive();
+ checkThrottleThenReceive("");
return;
}
if (ec) {
- SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1
second.", process_queue->simpleName(), ec.message());
- receiveMessageLater(std::chrono::seconds (1));
+ SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1
second.", process_queue->simpleName(),
+ ec.message());
+ receiveMessageLater(std::chrono::seconds(1), attempt_id);
return;
}
@@ -66,12 +71,12 @@ void AsyncReceiveMessageCallback::onCompletion(const
std::error_code& ec, const
result.source_host, result.messages.size(),
process_queue->simpleName());
process_queue->accountCache(result.messages);
consumer->getConsumeMessageService()->dispatch(process_queue,
result.messages);
- checkThrottleThenReceive();
+ checkThrottleThenReceive("");
}
const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME =
"receive-later-task";
-void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
+void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string
attempt_id) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_WARN("Process queue should have been destructed");
@@ -82,14 +87,14 @@ void
AsyncReceiveMessageCallback::checkThrottleThenReceive() {
SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive
messages later.",
process_queue->simpleName());
process_queue->syncIdleState();
- receiveMessageLater(std::chrono::seconds(1));
+ receiveMessageLater(std::chrono::seconds(1), attempt_id);
} else {
// Receive message immediately
- receiveMessageImmediately();
+ receiveMessageImmediately(attempt_id);
}
}
-void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
delay) {
+void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
delay, std::string& attempt_id) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
@@ -98,17 +103,18 @@ void
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
auto client_instance = process_queue->getClientManager();
std::weak_ptr<AsyncReceiveMessageCallback>
receive_callback_weak_ptr(shared_from_this());
- auto task = [receive_callback_weak_ptr]() {
+ auto task = [receive_callback_weak_ptr, &attempt_id]() {
auto async_receive_ptr = receive_callback_weak_ptr.lock();
if (async_receive_ptr) {
- async_receive_ptr->checkThrottleThenReceive();
+ async_receive_ptr->checkThrottleThenReceive(attempt_id);
}
};
- client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME,
delay, std::chrono::seconds(0));
+ client_instance->getScheduler()->schedule(
+ task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
}
-void AsyncReceiveMessageCallback::receiveMessageImmediately() {
+void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string&
attempt_id) {
auto process_queue_shared_ptr = process_queue_.lock();
if (!process_queue_shared_ptr) {
SPDLOG_INFO("ProcessQueue has been released. Ignore further receive
message request-response cycles");
@@ -121,7 +127,9 @@ void
AsyncReceiveMessageCallback::receiveMessageImmediately() {
process_queue_shared_ptr->simpleName());
return;
}
- impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
process_queue_shared_ptr->getFilterExpression());
+
+ impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
+ process_queue_shared_ptr->getFilterExpression(),
attempt_id);
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp
b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 002325c0..59df8578 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -16,14 +16,13 @@
*/
#include "ProcessQueueImpl.h"
-#include <atomic>
#include <chrono>
#include <memory>
#include <system_error>
#include <utility>
+#include "UniqueIdGenerator.h"
#include "AsyncReceiveMessageCallback.h"
-#include "ClientManagerImpl.h"
#include "MetadataConstants.h"
#include "Protocol.h"
#include "PushConsumerImpl.h"
@@ -98,39 +97,39 @@ bool ProcessQueueImpl::shouldThrottle() const {
return false;
}
-void ProcessQueueImpl::receiveMessage() {
+void ProcessQueueImpl::receiveMessage(std::string& attempt_id) {
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
-
- popMessage();
+ popMessage(attempt_id);
}
-void ProcessQueueImpl::popMessage() {
+void ProcessQueueImpl::popMessage(std::string& attempt_id) {
rmq::ReceiveMessageRequest request;
absl::flat_hash_map<std::string, std::string> metadata;
auto consumer_client = consumer_.lock();
if (!consumer_client) {
return;
}
+
Signature::sign(consumer_client->config(), metadata);
- wrapPopMessageRequest(metadata, request);
+ wrapPopMessageRequest(metadata, request, attempt_id);
syncIdleState();
- SPDLOG_DEBUG("Try to pop message from {}", simpleNameOf(message_queue_));
+ SPDLOG_DEBUG("Receive message from={}, attemptId={}",
simpleNameOf(message_queue_), attempt_id);
std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
- auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult&
result) {
- std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock();
- if (recv_cb) {
- recv_cb->onCompletion(ec, result);
+ auto callback =
+ [cb, &attempt_id](const std::error_code& ec, const ReceiveMessageResult&
result) {
+ std::shared_ptr<AsyncReceiveMessageCallback> receive_cb = cb.lock();
+ if (receive_cb) {
+ receive_cb->onCompletion(ec, attempt_id, result);
}
};
- client_manager_->receiveMessage(urlOf(message_queue_), metadata, request,
-
absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout
+
-
consumer_client->config().request_timeout),
- callback);
+ auto timeout = absl::ToChronoMilliseconds(
+ consumer_client->config().subscriber.polling_timeout +
consumer_client->config().request_timeout);
+ client_manager_->receiveMessage(urlOf(message_queue_), metadata, request,
timeout, callback);
}
void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>&
messages) {
@@ -184,8 +183,18 @@ void
ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expres
}
}
+void generateAttemptId(std::string& attempt_id) {
+ const std::string unique_id = UniqueIdGenerator::instance().next();
+ if (unique_id.size() < 34) {
+ return;
+ }
+ attempt_id = fmt::format(
+ "{}-{}-{}-{}-{}", unique_id.substr(0, 8), unique_id.substr(8, 4),
+ unique_id.substr(12, 4), unique_id.substr(16, 4), unique_id.substr(20,
12));
+}
+
void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string,
std::string>& metadata,
- rmq::ReceiveMessageRequest&
request) {
+ rmq::ReceiveMessageRequest&
request, std::string& attempt_id) {
std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock();
assert(consumer);
request.mutable_group()->CopyFrom(consumer->config().subscriber.group);
@@ -205,6 +214,11 @@ void
ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, st
auto fraction = invisible_time_ -
std::chrono::duration_cast<std::chrono::seconds>(invisible_time_);
int32_t nano_seconds =
static_cast<int32_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(fraction).count());
request.mutable_invisible_duration()->set_nanos(nano_seconds);
+
+ if (attempt_id.empty()) {
+ generateAttemptId(attempt_id);
+ }
+ request.set_attempt_id(attempt_id);
}
std::weak_ptr<PushConsumerImpl> ProcessQueueImpl::getConsumer() {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 712ac814..68bb9f67 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -16,10 +16,8 @@
*/
#include "PushConsumerImpl.h"
-#include <atomic>
#include <cassert>
#include <chrono>
-#include <cstdint>
#include <cstdlib>
#include <string>
#include <system_error>
@@ -324,9 +322,10 @@ void PushConsumerImpl::syncProcessQueue(const std::string&
topic,
for (const auto& message_queue : message_queue_list) {
if (std::none_of(current.cbegin(), current.cend(),
[&](const rmq::MessageQueue& item) { return item ==
message_queue; })) {
- SPDLOG_INFO("Start to receive message from {} according to latest
assignment info from load balancer",
+ SPDLOG_DEBUG("Start to receive message from {} according to latest
assignment info from load balancer",
simpleNameOf(message_queue));
- if (!receiveMessage(message_queue, filter_expression)) {
+ std::string attempt_id;
+ if (!receiveMessage(message_queue, filter_expression, attempt_id)) {
if (!active()) {
SPDLOG_WARN("Failed to initiate receive message
request-response-cycle for {}", simpleNameOf(message_queue));
// TODO: remove it from current assignment such that a second
attempt will be made again in the next round.
@@ -350,9 +349,9 @@ std::shared_ptr<ProcessQueue>
PushConsumerImpl::getOrCreateProcessQueue(const rm
process_queue = process_queue_table_.at(simpleNameOf(message_queue));
} else {
SPDLOG_INFO("Create ProcessQueue for message queue[{}]",
simpleNameOf(message_queue));
- // create ProcessQueue
- process_queue =
- std::make_shared<ProcessQueueImpl>(message_queue, filter_expression,
shared_from_this(), client_manager_);
+ // create process queue object
+ process_queue = std::make_shared<ProcessQueueImpl>(
+ message_queue, filter_expression, shared_from_this(),
client_manager_);
std::shared_ptr<AsyncReceiveMessageCallback> receive_callback =
std::make_shared<AsyncReceiveMessageCallback>(process_queue);
process_queue->callback(receive_callback);
@@ -363,7 +362,8 @@ std::shared_ptr<ProcessQueue>
PushConsumerImpl::getOrCreateProcessQueue(const rm
}
bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue,
- const FilterExpression&
filter_expression) {
+ const FilterExpression&
filter_expression,
+ std::string& attempt_id) {
if (!active()) {
SPDLOG_INFO("PushConsumer has stopped. Drop further receive message
request");
return false;
@@ -379,7 +379,7 @@ bool PushConsumerImpl::receiveMessage(const
rmq::MessageQueue& message_queue,
SPDLOG_ERROR("Failed to resolve address for brokerName={}",
message_queue.broker().name());
return false;
}
- process_queue_ptr->receiveMessage();
+ process_queue_ptr->receiveMessage(attempt_id);
return true;
}
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index 5a134428..b19f097b 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -29,11 +29,11 @@ class AsyncReceiveMessageCallback : public
std::enable_shared_from_this<AsyncRec
public:
explicit AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue>
process_queue);
- void onCompletion(const std::error_code& ec, const ReceiveMessageResult&
result);
+ void onCompletion(const std::error_code& ec, std::string& attempt_id, const
ReceiveMessageResult& result);
- void receiveMessageLater(std::chrono::milliseconds delay);
+ void receiveMessageLater(std::chrono::milliseconds delay, std::string&
attempt_id);
- void receiveMessageImmediately();
+ void receiveMessageImmediately(std::string& attempt_id);
private:
/**
@@ -42,9 +42,9 @@ private:
*/
std::weak_ptr<ProcessQueue> process_queue_;
- std::function<void(void)> receive_message_later_;
+ std::function<void(std::string)> receive_message_later_;
- void checkThrottleThenReceive();
+ void checkThrottleThenReceive(std::string attempt_id);
static const char* RECEIVE_LATER_TASK_NAME;
};
diff --git a/cpp/source/rocketmq/include/ProcessQueue.h
b/cpp/source/rocketmq/include/ProcessQueue.h
index 0e8ce74d..c512be34 100644
--- a/cpp/source/rocketmq/include/ProcessQueue.h
+++ b/cpp/source/rocketmq/include/ProcessQueue.h
@@ -38,7 +38,7 @@ public:
virtual void callback(std::shared_ptr<AsyncReceiveMessageCallback> callback)
= 0;
- virtual void receiveMessage() = 0;
+ virtual void receiveMessage(std::string& attempt_id) = 0;
virtual std::string topic() const = 0;
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h
b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 75a5d2d3..b811e936 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -63,7 +63,7 @@ public:
std::shared_ptr<ClientManager> getClientManager() override;
- void receiveMessage() override;
+ void receiveMessage(std::string& attempt_id) override;
const std::string& simpleName() const override {
return simple_name_;
@@ -127,10 +127,10 @@ private:
*/
std::atomic<uint64_t> cached_message_memory_;
- void popMessage();
+ void popMessage(std::string& attempt_id);
void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>&
metadata,
- rmq::ReceiveMessageRequest& request);
+ rmq::ReceiveMessageRequest& request, std::string&
attempt_id);
void wrapFilterExpression(rmq::FilterExpression* filter_expression);
};
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h
b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 8f360fda..42c3fe49 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -99,8 +99,9 @@ public:
const
FilterExpression& filter_expression)
LOCKS_EXCLUDED(process_queue_table_mtx_);
- bool receiveMessage(const rmq::MessageQueue& message_queue, const
FilterExpression& filter_expression)
- LOCKS_EXCLUDED(process_queue_table_mtx_);
+ bool receiveMessage(const rmq::MessageQueue& message_queue,
+ const FilterExpression& filter_expression,
+ std::string& attempt_id)
LOCKS_EXCLUDED(process_queue_table_mtx_);
uint32_t consumeThreadPoolSize() const;