This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/fifo_opt by this push:
     new b27ccdf6 feat: implement FifoProducerPartition
b27ccdf6 is described below

commit b27ccdf632dc5eb6442c49e9bbdad7c95eabe3f8
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 19:48:46 2024 +0800

    feat: implement FifoProducerPartition
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/source/rocketmq/FifoProducerPartition.cpp      | 77 ++++++++++++++++++++++
 cpp/source/rocketmq/include/FifoContext.h          | 14 ++++
 cpp/source/rocketmq/include/FifoProducerImpl.h     |  9 +++
 .../rocketmq/include/FifoProducerPartition.h       | 34 ++++++++++
 cpp/source/rocketmq/include/ProducerImpl.h         |  6 +-
 5 files changed, 136 insertions(+), 4 deletions(-)

diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp 
b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 00000000..1356bc5e
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,77 @@
+#include "FifoProducerPartition.h"
+
+#include <absl/synchronization/mutex.h>
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_back(context);
+  }
+
+  trySend();
+}
+
+void FifoProducerPartition::trySend() {
+  bool expected = false;
+  if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+    absl::MutexLock lk(&messages_mtx_);
+
+    FifoContext& ctx = messages_.front();
+    MessageConstPtr message = std::move(ctx.message);
+    SendCallback send_callback = ctx.callback;
+
+    std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+    auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
+      partition->onComplete(ec, receipt, send_callback);
+    };
+    producer_->send(std::move(message), fifo_callback);
+    messages_.pop_front();
+  }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const 
SendReceipt& receipt, SendCallback& callback) {
+  if (!ec) {
+    callback(ec, receipt);
+    // update inflight status
+    bool expected = true;
+    if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+      trySend();
+    } else {
+      SPDLOG_ERROR("Unexpected inflight status");
+    }
+    return;
+  }
+
+  // Put the message back to the front of the list
+  FifoContext retry_context{};
+  SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+  retry_context.message = std::move(receipt_mut.message);
+  retry_context.callback = callback;
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_front(retry_context);
+  }
+
+  // Update inflight status
+  bool expected = true;
+  if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+    trySend();
+  } else {
+    SPDLOG_ERROR("Unexpected inflight status");
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoContext.h 
b/cpp/source/rocketmq/include/FifoContext.h
new file mode 100644
index 00000000..98927b4e
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct FifoContext {
+  MessageConstPtr message;
+  SendCallback callback;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h 
b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 00000000..82539c19
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include "ProducerImpl.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h 
b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 00000000..a2f4ae3f
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <absl/base/internal/thread_annotations.h>
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public 
std::enable_shared_from_this<FifoProducerPartition> {
+public:
+  void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+  void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+  void onComplete(const std::error_code& ec, const SendReceipt& receipt, 
SendCallback& callback);
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+  absl::Mutex messages_mtx_;
+  std::atomic_bool inflight_{false};
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index d51bfc0e..b572f20d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -77,10 +77,8 @@ public:
    * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
    * sent.
    *
-   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
-   * application to conduct customized retry policy.
-   *
-   * TODO: Refine this API.
+   * TODO: Refine this API. Current API is not good enough as it cannot handle 
the message back to its caller on publish
+   * failure.
    */
   void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 

Reply via email to