This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/fifo_opt by this push:
new 8eac0262 feat: implement builder for FifoProducer
8eac0262 is described below
commit 8eac0262254799cd08aa9f27cc6e2b659b1afe1e
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 21:16:11 2024 +0800
feat: implement builder for FifoProducer
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/include/rocketmq/Configuration.h | 6 +--
cpp/include/rocketmq/FifoProducer.h | 52 ++++++++++++++++++++++++
cpp/source/base/Configuration.cpp | 4 +-
cpp/source/rocketmq/FifoProducer.cpp | 56 ++++++++++++++++++++++++++
cpp/source/rocketmq/include/ClientImpl.h | 4 +-
cpp/source/rocketmq/include/FifoProducerImpl.h | 4 ++
6 files changed, 119 insertions(+), 7 deletions(-)
diff --git a/cpp/include/rocketmq/Configuration.h
b/cpp/include/rocketmq/Configuration.h
index 0037c270..6dcd4137 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@ public:
}
bool withSsl() const {
- return withSsl_;
+ return tls_;
}
protected:
@@ -56,7 +56,7 @@ private:
std::string endpoints_;
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds
request_timeout_{ConfigurationDefaults::RequestTimeout};
- bool withSsl_ = true;
+ bool tls_ = true;
};
class ConfigurationBuilder {
@@ -67,7 +67,7 @@ public:
ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds
request_timeout);
- ConfigurationBuilder& withSsl(bool enable);
+ ConfigurationBuilder& withSsl(bool with_ssl);
Configuration build();
diff --git a/cpp/include/rocketmq/FifoProducer.h
b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 00000000..fbf76620
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+ static FifoProducerBuilder newBuilder();
+
+ void send(MessageConstPtr message, SendCallback callback);
+
+private:
+ std::shared_ptr<FifoProducerImpl> impl_;
+
+ explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) :
impl_(std::move(impl)) {
+ }
+
+ void start();
+
+ friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+ FifoProducerBuilder();
+
+ FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+ FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+ FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+ FifoProducer build();
+
+private:
+ std::shared_ptr<FifoProducerImpl> impl_;
+ std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp
b/cpp/source/base/Configuration.cpp
index 2a136d5d..66cff2e8 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@ ConfigurationBuilder&
ConfigurationBuilder::withRequestTimeout(std::chrono::mill
return *this;
}
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
- configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+ configuration_.tls_ = with_ssl;
return *this;
}
diff --git a/cpp/source/rocketmq/FifoProducer.cpp
b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 00000000..da43b589
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,56 @@
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+ return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() :
producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration
configuration) {
+ auto name_server_resolver =
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+ producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+ producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+ producer_impl_->withRequestTimeout(configuration.requestTimeout());
+ producer_impl_->withSsl(configuration.withSsl());
+ return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const
std::vector<std::string>& topics) {
+ producer_impl_->withTopics(topics);
+ return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t
concurrency) {
+ this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_,
concurrency);
+ return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+ FifoProducer fifo_producer(this->impl_);
+ fifo_producer.start();
+ return fifo_producer;
+}
+
+void FifoProducer::start() {
+ impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+ impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index c266047a..d7693962 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@ public:
client_config_.request_timeout = absl::FromChrono(request_timeout);
}
- void withSsl(bool enable) {
- client_config_.withSsl = enable;
+ void withSsl(bool with_ssl) {
+ client_config_.withSsl = with_ssl;
}
/**
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h
b/cpp/source/rocketmq/include/FifoProducerImpl.h
index 63d32ab2..cc11dcf6 100644
--- a/cpp/source/rocketmq/include/FifoProducerImpl.h
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -22,6 +22,10 @@ public:
void send(MessageConstPtr message, SendCallback callback);
+ std::shared_ptr<ProducerImpl>& internalProducer() {
+ return producer_;
+ }
+
private:
std::shared_ptr<ProducerImpl> producer_;
std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;