This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/fifo_opt by this push:
     new ba396383 feat: implement FifoProducerImpl
ba396383 is described below

commit ba3963831ca99ad075ec4855df82385edb80215f
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 20:32:16 2024 +0800

    feat: implement FifoProducerImpl
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/source/rocketmq/FifoContext.cpp                | 16 +++++++++++++++
 cpp/source/rocketmq/FifoProducerImpl.cpp           | 21 ++++++++++++++++++++
 cpp/source/rocketmq/FifoProducerPartition.cpp      |  8 +++-----
 cpp/source/rocketmq/include/FifoContext.h          |  4 ++++
 cpp/source/rocketmq/include/FifoProducerImpl.h     | 23 ++++++++++++++++++++++
 .../rocketmq/include/FifoProducerPartition.h       |  3 +++
 6 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/cpp/source/rocketmq/FifoContext.cpp 
b/cpp/source/rocketmq/FifoContext.cpp
new file mode 100644
index 00000000..f1affd16
--- /dev/null
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -0,0 +1,16 @@
+#include "FifoContext.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+    : message(std::move(message)), callback(callback) {
+}
+
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+  this->message = std::move(rhs.message);
+  this->callback = rhs.callback;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerImpl.cpp 
b/cpp/source/rocketmq/FifoProducerImpl.cpp
new file mode 100644
index 00000000..2b33345c
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -0,0 +1,21 @@
+#include "FifoProducerImpl.h"
+
+#include <utility>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+  auto& group = message->group();
+  std::size_t hash = hash_fn_(group);
+  std::size_t slot = hash % concurrency_;
+
+  FifoContext context(std::move(message), callback);
+  partitions_[slot]->add(std::move(context));
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp 
b/cpp/source/rocketmq/FifoProducerPartition.cpp
index 1356bc5e..37526f76 100644
--- a/cpp/source/rocketmq/FifoProducerPartition.cpp
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -18,7 +18,7 @@ ROCKETMQ_NAMESPACE_BEGIN
 void FifoProducerPartition::add(FifoContext&& context) {
   {
     absl::MutexLock lk(&messages_mtx_);
-    messages_.emplace_back(context);
+    messages_.emplace_back(std::move(context));
   }
 
   trySend();
@@ -56,13 +56,11 @@ void FifoProducerPartition::onComplete(const 
std::error_code& ec, const SendRece
   }
 
   // Put the message back to the front of the list
-  FifoContext retry_context{};
   SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
-  retry_context.message = std::move(receipt_mut.message);
-  retry_context.callback = callback;
+  FifoContext retry_context(std::move(receipt_mut.message), callback);
   {
     absl::MutexLock lk(&messages_mtx_);
-    messages_.emplace_front(retry_context);
+    messages_.emplace_front(std::move(retry_context));
   }
 
   // Update inflight status
diff --git a/cpp/source/rocketmq/include/FifoContext.h 
b/cpp/source/rocketmq/include/FifoContext.h
index 98927b4e..55812ba9 100644
--- a/cpp/source/rocketmq/include/FifoContext.h
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -9,6 +9,10 @@ ROCKETMQ_NAMESPACE_BEGIN
 struct FifoContext {
   MessageConstPtr message;
   SendCallback callback;
+
+  FifoContext(MessageConstPtr message, SendCallback callback);
+
+  FifoContext(FifoContext&& rhs) noexcept;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h 
b/cpp/source/rocketmq/include/FifoProducerImpl.h
index 82539c19..63d32ab2 100644
--- a/cpp/source/rocketmq/include/FifoProducerImpl.h
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -1,9 +1,32 @@
 #pragma once
 
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
 #include "ProducerImpl.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+  FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t 
concurrency)
+      : producer_(producer), concurrency_(concurrency), 
partitions_(concurrency) {
+    for (auto i = 0; i < concurrency; i++) {
+      partitions_[i] = std::make_shared<FifoProducerPartition>(producer_);
+    }
+  };
+
+  void send(MessageConstPtr message, SendCallback callback);
 
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+  std::size_t concurrency_;
+  std::hash<std::string> hash_fn_;
+};
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h 
b/cpp/source/rocketmq/include/FifoProducerPartition.h
index a2f4ae3f..406b8fa6 100644
--- a/cpp/source/rocketmq/include/FifoProducerPartition.h
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -18,6 +18,9 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class FifoProducerPartition : public 
std::enable_shared_from_this<FifoProducerPartition> {
 public:
+  FifoProducerPartition(std::shared_ptr<ProducerImpl> producer) : 
producer_(producer) {
+  }
+
   void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
 
   void trySend() LOCKS_EXCLUDED(messages_mtx_);

Reply via email to