This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/fifo_opt by this push:
new dc218187 fix: SendReceipt now contains std::unique_ptr<Message> being
sent
dc218187 is described below
commit dc218187f78528ddaa7443ebdfa25b1fbd8e2fbf
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 17:39:10 2024 +0800
fix: SendReceipt now contains std::unique_ptr<Message> being sent
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/include/rocketmq/Producer.h | 5 +---
cpp/include/rocketmq/SendReceipt.h | 3 ++
cpp/source/rocketmq/Producer.cpp | 4 ---
cpp/source/rocketmq/ProducerImpl.cpp | 22 +++++++++++----
cpp/source/rocketmq/SendContext.cpp | 44 ++++++++++++++----------------
cpp/source/rocketmq/include/ProducerImpl.h | 7 +----
6 files changed, 43 insertions(+), 42 deletions(-)
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb3..6b42843d 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
*/
#pragma once
-#include <chrono>
-#include <functional>
#include <memory>
#include <system_error>
#include <vector>
#include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
#include "Message.h"
#include "SendCallback.h"
#include "SendReceipt.h"
#include "Transaction.h"
#include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/include/rocketmq/SendReceipt.h
index 6c95ecc0..7eef6e79 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -19,6 +19,7 @@
#include <string>
#include "RocketMQ.h"
+#include "rocketmq/Message.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -28,6 +29,8 @@ struct SendReceipt {
std::string message_id;
std::string transaction_id;
+
+ MessageConstPtr message;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812ed..907d0a28 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
#include <system_error>
#include <utility>
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
#include "ProducerImpl.h"
#include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index d5e6e485..04fc6266 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -192,19 +192,28 @@ void ProducerImpl::wrapSendMessageRequest(const Message&
message, SendMessageReq
SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec)
noexcept {
ensureRunning(ec);
if (ec) {
- return {};
+ SPDLOG_WARN("Producer is not running");
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
auto topic_publish_info = getPublishInfo(message->topic());
if (!topic_publish_info) {
+ SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
ec = ErrorCode::NotFound;
- return {};
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
std::vector<rmq::MessageQueue> message_queue_list;
if
(!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(),
message_queue_list)) {
+ SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]",
message->topic());
ec = ErrorCode::NotFound;
- return {};
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
auto mtx = std::make_shared<absl::Mutex>();
@@ -213,9 +222,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message,
std::error_code& ec) noe
SendReceipt send_receipt;
// Define callback
- auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) {
+ auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) mutable {
ec = code;
- send_receipt = receipt;
+ SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ send_receipt.message = std::move(receipt_mut.message);
{
absl::MutexLock lk(mtx.get());
completed = true;
@@ -343,12 +353,14 @@ void ProducerImpl::send0(MessageConstPtr message,
SendCallback callback, std::ve
std::error_code ec;
validate(*message, ec);
if (ec) {
+ send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
if (list.empty()) {
ec = ErrorCode::NotFound;
+ send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
diff --git a/cpp/source/rocketmq/SendContext.cpp
b/cpp/source/rocketmq/SendContext.cpp
index 00f51c10..bd97384d 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,11 +21,10 @@
#include "ProducerImpl.h"
#include "PublishStats.h"
#include "Tag.h"
-#include "TransactionImpl.h"
#include "opencensus/trace/span.h"
+#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "spdlog/spdlog.h"
-#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -36,18 +35,19 @@ void SendContext::onSuccess(const SendResult& send_result)
noexcept {
span_.End();
}
- auto publisher = producer_.lock();
- if (!publisher) {
+ auto producer = producer_.lock();
+ if (!producer) {
+ SPDLOG_WARN("Producer has been destructed");
return;
}
// Collect metrics
{
auto duration = std::chrono::steady_clock::now() - request_time_;
- opencensus::stats::Record({{publisher->stats().latency(),
MixAll::millisecondsOf(duration)}},
+ opencensus::stats::Record({{producer->stats().latency(),
MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(),
publisher->config().client_id},
+ {Tag::clientIdTag(),
producer->config().client_id},
{Tag::invocationStatusTag(), "success"},
});
}
@@ -57,7 +57,7 @@ void SendContext::onSuccess(const SendResult& send_result)
noexcept {
send_receipt.target = send_result.target;
send_receipt.message_id = send_result.message_id;
send_receipt.transaction_id = send_result.transaction_id;
-
+ send_receipt.message = std::move(message_);
callback_(send_result.ec, send_receipt);
}
@@ -68,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec)
noexcept {
span_.End();
}
- auto publisher = producer_.lock();
- if (!publisher) {
+ auto producer = producer_.lock();
+ if (!producer) {
+ SPDLOG_WARN("Producer has been destructed");
return;
}
// Collect metrics
{
auto duration = std::chrono::steady_clock::now() - request_time_;
- opencensus::stats::Record({{publisher->stats().latency(),
MixAll::millisecondsOf(duration)}},
+ opencensus::stats::Record({{producer->stats().latency(),
MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(),
publisher->config().client_id},
+ {Tag::clientIdTag(),
producer->config().client_id},
{Tag::invocationStatusTag(), "failure"},
});
}
- if (++attempt_times_ >= publisher->maxAttemptTimes()) {
- SPDLOG_WARN("Retried {} times, which exceeds the limit: {}",
attempt_times_, publisher->maxAttemptTimes());
- callback_(ec, {});
- return;
- }
-
- std::shared_ptr<ProducerImpl> producer = producer_.lock();
- if (!producer) {
- SPDLOG_WARN("Producer has been destructed");
- callback_(ec, {});
+ if (++attempt_times_ >= producer->maxAttemptTimes()) {
+ SPDLOG_WARN("Retried {} times, which exceeds the limit: {}",
attempt_times_, producer->maxAttemptTimes());
+ SendReceipt receipt{};
+ receipt.message = std::move(message_);
+ callback_(ec, receipt);
return;
}
if (candidates_.empty()) {
SPDLOG_WARN("No alternative hosts to perform additional retries");
- callback_(ec, {});
+ SendReceipt receipt{};
+ receipt.message = std::move(message_);
+ callback_(ec, receipt);
return;
}
@@ -109,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec)
noexcept {
auto ctx = shared_from_this();
// If publish message requests are throttled, retry after backoff
if (ErrorCode::TooManyRequests == ec) {
- auto&& backoff = publisher->backoff(attempt_times_);
+ auto&& backoff = producer->backoff(attempt_times_);
SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry
after {}ms", message_->topic(),
message_->id(), MixAll::millisecondsOf(backoff));
auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a93..a75c3444 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
*/
#pragma once
-#include <chrono>
#include <memory>
-#include <mutex>
#include <string>
#include <system_error>
#include "ClientImpl.h"
-#include "ClientManagerImpl.h"
#include "MixAll.h"
#include "PublishInfoCallback.h"
+#include "PublishStats.h"
#include "SendContext.h"
#include "TopicPublishInfo.h"
#include "TransactionImpl.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"
#include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
#include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
ROCKETMQ_NAMESPACE_BEGIN