This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/fifo_opt by this push:
new ba396383 feat: implement FifoProducerImpl
ba396383 is described below
commit ba3963831ca99ad075ec4855df82385edb80215f
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 20:32:16 2024 +0800
feat: implement FifoProducerImpl
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/source/rocketmq/FifoContext.cpp | 16 +++++++++++++++
cpp/source/rocketmq/FifoProducerImpl.cpp | 21 ++++++++++++++++++++
cpp/source/rocketmq/FifoProducerPartition.cpp | 8 +++-----
cpp/source/rocketmq/include/FifoContext.h | 4 ++++
cpp/source/rocketmq/include/FifoProducerImpl.h | 23 ++++++++++++++++++++++
.../rocketmq/include/FifoProducerPartition.h | 3 +++
6 files changed, 70 insertions(+), 5 deletions(-)
diff --git a/cpp/source/rocketmq/FifoContext.cpp
b/cpp/source/rocketmq/FifoContext.cpp
new file mode 100644
index 00000000..f1affd16
--- /dev/null
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -0,0 +1,16 @@
+#include "FifoContext.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+ : message(std::move(message)), callback(callback) {
+}
+
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+ this->message = std::move(rhs.message);
+ this->callback = rhs.callback;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerImpl.cpp
b/cpp/source/rocketmq/FifoProducerImpl.cpp
new file mode 100644
index 00000000..2b33345c
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -0,0 +1,21 @@
+#include "FifoProducerImpl.h"
+
+#include <utility>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+ auto& group = message->group();
+ std::size_t hash = hash_fn_(group);
+ std::size_t slot = hash % concurrency_;
+
+ FifoContext context(std::move(message), callback);
+ partitions_[slot]->add(std::move(context));
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp
b/cpp/source/rocketmq/FifoProducerPartition.cpp
index 1356bc5e..37526f76 100644
--- a/cpp/source/rocketmq/FifoProducerPartition.cpp
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -18,7 +18,7 @@ ROCKETMQ_NAMESPACE_BEGIN
void FifoProducerPartition::add(FifoContext&& context) {
{
absl::MutexLock lk(&messages_mtx_);
- messages_.emplace_back(context);
+ messages_.emplace_back(std::move(context));
}
trySend();
@@ -56,13 +56,11 @@ void FifoProducerPartition::onComplete(const
std::error_code& ec, const SendRece
}
// Put the message back to the front of the list
- FifoContext retry_context{};
SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
- retry_context.message = std::move(receipt_mut.message);
- retry_context.callback = callback;
+ FifoContext retry_context(std::move(receipt_mut.message), callback);
{
absl::MutexLock lk(&messages_mtx_);
- messages_.emplace_front(retry_context);
+ messages_.emplace_front(std::move(retry_context));
}
// Update inflight status
diff --git a/cpp/source/rocketmq/include/FifoContext.h
b/cpp/source/rocketmq/include/FifoContext.h
index 98927b4e..55812ba9 100644
--- a/cpp/source/rocketmq/include/FifoContext.h
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -9,6 +9,10 @@ ROCKETMQ_NAMESPACE_BEGIN
struct FifoContext {
MessageConstPtr message;
SendCallback callback;
+
+ FifoContext(MessageConstPtr message, SendCallback callback);
+
+ FifoContext(FifoContext&& rhs) noexcept;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h
b/cpp/source/rocketmq/include/FifoProducerImpl.h
index 82539c19..63d32ab2 100644
--- a/cpp/source/rocketmq/include/FifoProducerImpl.h
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -1,9 +1,32 @@
#pragma once
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
#include "ProducerImpl.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
ROCKETMQ_NAMESPACE_BEGIN
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+ FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t
concurrency)
+ : producer_(producer), concurrency_(concurrency),
partitions_(concurrency) {
+ for (auto i = 0; i < concurrency; i++) {
+ partitions_[i] = std::make_shared<FifoProducerPartition>(producer_);
+ }
+ };
+
+ void send(MessageConstPtr message, SendCallback callback);
+private:
+ std::shared_ptr<ProducerImpl> producer_;
+ std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+ std::size_t concurrency_;
+ std::hash<std::string> hash_fn_;
+};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h
b/cpp/source/rocketmq/include/FifoProducerPartition.h
index a2f4ae3f..406b8fa6 100644
--- a/cpp/source/rocketmq/include/FifoProducerPartition.h
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -18,6 +18,9 @@ ROCKETMQ_NAMESPACE_BEGIN
class FifoProducerPartition : public
std::enable_shared_from_this<FifoProducerPartition> {
public:
+ FifoProducerPartition(std::shared_ptr<ProducerImpl> producer) :
producer_(producer) {
+ }
+
void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
void trySend() LOCKS_EXCLUDED(messages_mtx_);