This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/fifo_opt by this push:
new b27ccdf6 feat: implement FifoProducerPartition
b27ccdf6 is described below
commit b27ccdf632dc5eb6442c49e9bbdad7c95eabe3f8
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 19:48:46 2024 +0800
feat: implement FifoProducerPartition
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/source/rocketmq/FifoProducerPartition.cpp | 77 ++++++++++++++++++++++
cpp/source/rocketmq/include/FifoContext.h | 14 ++++
cpp/source/rocketmq/include/FifoProducerImpl.h | 9 +++
.../rocketmq/include/FifoProducerPartition.h | 34 ++++++++++
cpp/source/rocketmq/include/ProducerImpl.h | 6 +-
5 files changed, 136 insertions(+), 4 deletions(-)
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp
b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 00000000..1356bc5e
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,77 @@
+#include "FifoProducerPartition.h"
+
+#include <absl/synchronization/mutex.h>
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+ {
+ absl::MutexLock lk(&messages_mtx_);
+ messages_.emplace_back(context);
+ }
+
+ trySend();
+}
+
+void FifoProducerPartition::trySend() {
+ bool expected = false;
+ if (inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
+ absl::MutexLock lk(&messages_mtx_);
+
+ FifoContext& ctx = messages_.front();
+ MessageConstPtr message = std::move(ctx.message);
+ SendCallback send_callback = ctx.callback;
+
+ std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+ auto fifo_callback = [=](const std::error_code& ec, const SendReceipt&
receipt) mutable {
+ partition->onComplete(ec, receipt, send_callback);
+ };
+ producer_->send(std::move(message), fifo_callback);
+ messages_.pop_front();
+ }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const
SendReceipt& receipt, SendCallback& callback) {
+ if (!ec) {
+ callback(ec, receipt);
+ // update inflight status
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
+ trySend();
+ } else {
+ SPDLOG_ERROR("Unexpected inflight status");
+ }
+ return;
+ }
+
+ // Put the message back to the front of the list
+ FifoContext retry_context{};
+ SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ retry_context.message = std::move(receipt_mut.message);
+ retry_context.callback = callback;
+ {
+ absl::MutexLock lk(&messages_mtx_);
+ messages_.emplace_front(retry_context);
+ }
+
+ // Update inflight status
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
+ trySend();
+ } else {
+ SPDLOG_ERROR("Unexpected inflight status");
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoContext.h
b/cpp/source/rocketmq/include/FifoContext.h
new file mode 100644
index 00000000..98927b4e
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct FifoContext {
+ MessageConstPtr message;
+ SendCallback callback;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h
b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 00000000..82539c19
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include "ProducerImpl.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h
b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 00000000..a2f4ae3f
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <absl/base/internal/thread_annotations.h>
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public
std::enable_shared_from_this<FifoProducerPartition> {
+public:
+ void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+ void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+ void onComplete(const std::error_code& ec, const SendReceipt& receipt,
SendCallback& callback);
+
+private:
+ std::shared_ptr<ProducerImpl> producer_;
+ std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+ absl::Mutex messages_mtx_;
+ std::atomic_bool inflight_{false};
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index d51bfc0e..b572f20d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -77,10 +77,8 @@ public:
* Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
* sent.
*
- * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>, facilliating
- * application to conduct customized retry policy.
- *
- * TODO: Refine this API.
+ * TODO: Refine this API. Current API is not good enough as it cannot handle
the message back to its caller on publish
+ * failure.
*/
void send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);