This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/fifo_opt by this push:
     new dc218187 fix: SendReceipt now contains std::unique_ptr<Message> being 
sent
dc218187 is described below

commit dc218187f78528ddaa7443ebdfa25b1fbd8e2fbf
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 17:39:10 2024 +0800

    fix: SendReceipt now contains std::unique_ptr<Message> being sent
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/include/rocketmq/Producer.h            |  5 +---
 cpp/include/rocketmq/SendReceipt.h         |  3 ++
 cpp/source/rocketmq/Producer.cpp           |  4 ---
 cpp/source/rocketmq/ProducerImpl.cpp       | 22 +++++++++++----
 cpp/source/rocketmq/SendContext.cpp        | 44 ++++++++++++++----------------
 cpp/source/rocketmq/include/ProducerImpl.h |  7 +----
 6 files changed, 43 insertions(+), 42 deletions(-)

diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb3..6b42843d 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
  */
 #pragma once
 
-#include <chrono>
-#include <functional>
 #include <memory>
 #include <system_error>
 #include <vector>
 
 #include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
 #include "Message.h"
 #include "SendCallback.h"
 #include "SendReceipt.h"
 #include "Transaction.h"
 #include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/include/rocketmq/SendReceipt.h
index 6c95ecc0..7eef6e79 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -19,6 +19,7 @@
 #include <string>
 
 #include "RocketMQ.h"
+#include "rocketmq/Message.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -28,6 +29,8 @@ struct SendReceipt {
   std::string message_id;
 
   std::string transaction_id;
+
+  MessageConstPtr message;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812ed..907d0a28 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
 #include <system_error>
 #include <utility>
 
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
 #include "ProducerImpl.h"
 #include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index d5e6e485..04fc6266 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -192,19 +192,28 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
 SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) 
noexcept {
   ensureRunning(ec);
   if (ec) {
-    return {};
+    SPDLOG_WARN("Producer is not running");
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto topic_publish_info = getPublishInfo(message->topic());
   if (!topic_publish_info) {
+    SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   std::vector<rmq::MessageQueue> message_queue_list;
   if 
(!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), 
message_queue_list)) {
+    SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", 
message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto mtx = std::make_shared<absl::Mutex>();
@@ -213,9 +222,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   SendReceipt   send_receipt;
 
   // Define callback
-  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) {
+  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) mutable {
     ec = code;
-    send_receipt = receipt;
+    SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+    send_receipt.message = std::move(receipt_mut.message);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
@@ -343,12 +353,14 @@ void ProducerImpl::send0(MessageConstPtr message, 
SendCallback callback, std::ve
   std::error_code ec;
   validate(*message, ec);
   if (ec) {
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
 
   if (list.empty()) {
     ec = ErrorCode::NotFound;
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
diff --git a/cpp/source/rocketmq/SendContext.cpp 
b/cpp/source/rocketmq/SendContext.cpp
index 00f51c10..bd97384d 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,11 +21,10 @@
 #include "ProducerImpl.h"
 #include "PublishStats.h"
 #include "Tag.h"
-#include "TransactionImpl.h"
 #include "opencensus/trace/span.h"
+#include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "spdlog/spdlog.h"
-#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -36,18 +35,19 @@ void SendContext::onSuccess(const SendResult& send_result) 
noexcept {
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), 
MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), 
MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), 
publisher->config().client_id},
+                                  {Tag::clientIdTag(), 
producer->config().client_id},
                                   {Tag::invocationStatusTag(), "success"},
                               });
   }
@@ -57,7 +57,7 @@ void SendContext::onSuccess(const SendResult& send_result) 
noexcept {
   send_receipt.target = send_result.target;
   send_receipt.message_id = send_result.message_id;
   send_receipt.transaction_id = send_result.transaction_id;
-
+  send_receipt.message = std::move(message_);
   callback_(send_result.ec, send_receipt);
 }
 
@@ -68,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec) 
noexcept {
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), 
MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), 
MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), 
publisher->config().client_id},
+                                  {Tag::clientIdTag(), 
producer->config().client_id},
                                   {Tag::invocationStatusTag(), "failure"},
                               });
   }
 
-  if (++attempt_times_ >= publisher->maxAttemptTimes()) {
-    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", 
attempt_times_, publisher->maxAttemptTimes());
-    callback_(ec, {});
-    return;
-  }
-
-  std::shared_ptr<ProducerImpl> producer = producer_.lock();
-  if (!producer) {
-    SPDLOG_WARN("Producer has been destructed");
-    callback_(ec, {});
+  if (++attempt_times_ >= producer->maxAttemptTimes()) {
+    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", 
attempt_times_, producer->maxAttemptTimes());
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
   if (candidates_.empty()) {
     SPDLOG_WARN("No alternative hosts to perform additional retries");
-    callback_(ec, {});
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
@@ -109,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec) 
noexcept {
   auto ctx = shared_from_this();
   // If publish message requests are throttled, retry after backoff
   if (ErrorCode::TooManyRequests == ec) {
-    auto&& backoff = publisher->backoff(attempt_times_);
+    auto&& backoff = producer->backoff(attempt_times_);
     SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry 
after {}ms", message_->topic(),
                  message_->id(), MixAll::millisecondsOf(backoff));
     auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a93..a75c3444 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
  */
 #pragma once
 
-#include <chrono>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <system_error>
 
 #include "ClientImpl.h"
-#include "ClientManagerImpl.h"
 #include "MixAll.h"
 #include "PublishInfoCallback.h"
+#include "PublishStats.h"
 #include "SendContext.h"
 #include "TopicPublishInfo.h"
 #include "TransactionImpl.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
 #include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 

Reply via email to