This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/fifo_opt by this push:
     new 8eac0262 feat: implement builder for FifoProducer
8eac0262 is described below

commit 8eac0262254799cd08aa9f27cc6e2b659b1afe1e
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 21:16:11 2024 +0800

    feat: implement builder for FifoProducer
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/include/rocketmq/Configuration.h           |  6 +--
 cpp/include/rocketmq/FifoProducer.h            | 52 ++++++++++++++++++++++++
 cpp/source/base/Configuration.cpp              |  4 +-
 cpp/source/rocketmq/FifoProducer.cpp           | 56 ++++++++++++++++++++++++++
 cpp/source/rocketmq/include/ClientImpl.h       |  4 +-
 cpp/source/rocketmq/include/FifoProducerImpl.h |  4 ++
 6 files changed, 119 insertions(+), 7 deletions(-)

diff --git a/cpp/include/rocketmq/Configuration.h 
b/cpp/include/rocketmq/Configuration.h
index 0037c270..6dcd4137 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@ public:
   }
 
   bool withSsl() const {
-    return withSsl_;
+    return tls_;
   }
 
 protected:
@@ -56,7 +56,7 @@ private:
   std::string               endpoints_;
   CredentialsProviderPtr    credentials_provider_;
   std::chrono::milliseconds 
request_timeout_{ConfigurationDefaults::RequestTimeout};
-  bool withSsl_ = true;
+  bool tls_ = true;
 };
 
 class ConfigurationBuilder {
@@ -67,7 +67,7 @@ public:
 
   ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds 
request_timeout);
 
-  ConfigurationBuilder& withSsl(bool enable);
+  ConfigurationBuilder& withSsl(bool with_ssl);
 
   Configuration build();
 
diff --git a/cpp/include/rocketmq/FifoProducer.h 
b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 00000000..fbf76620
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+  static FifoProducerBuilder newBuilder();
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+
+  explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) : 
impl_(std::move(impl)) {
+  }
+
+  void start();
+
+  friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+  FifoProducerBuilder();
+
+  FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+  FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+  FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+  FifoProducer build();
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+  std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp 
b/cpp/source/base/Configuration.cpp
index 2a136d5d..66cff2e8 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@ ConfigurationBuilder& 
ConfigurationBuilder::withRequestTimeout(std::chrono::mill
   return *this;
 }
 
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
-  configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+  configuration_.tls_ = with_ssl;
   return *this;
 }
 
diff --git a/cpp/source/rocketmq/FifoProducer.cpp 
b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 00000000..da43b589
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,56 @@
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+  return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() : 
producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration 
configuration) {
+  auto name_server_resolver = 
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+  producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+  producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+  producer_impl_->withRequestTimeout(configuration.requestTimeout());
+  producer_impl_->withSsl(configuration.withSsl());
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const 
std::vector<std::string>& topics) {
+  producer_impl_->withTopics(topics);
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t 
concurrency) {
+  this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_, 
concurrency);
+  return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+  FifoProducer fifo_producer(this->impl_);
+  fifo_producer.start();
+  return fifo_producer;
+}
+
+void FifoProducer::start() {
+  impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+  impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index c266047a..d7693962 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@ public:
     client_config_.request_timeout = absl::FromChrono(request_timeout);
   }
 
-  void withSsl(bool enable) {
-    client_config_.withSsl = enable;
+  void withSsl(bool with_ssl) {
+    client_config_.withSsl = with_ssl;
   }
 
   /**
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h 
b/cpp/source/rocketmq/include/FifoProducerImpl.h
index 63d32ab2..cc11dcf6 100644
--- a/cpp/source/rocketmq/include/FifoProducerImpl.h
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -22,6 +22,10 @@ public:
 
   void send(MessageConstPtr message, SendCallback callback);
 
+  std::shared_ptr<ProducerImpl>& internalProducer() {
+    return producer_;
+  }
+
 private:
   std::shared_ptr<ProducerImpl> producer_;
   std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;

Reply via email to