This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/develop-cpp by this push:
     new eb7669ed Fifo opt (#732)
eb7669ed is described below

commit eb7669ed11b03a74f361beff7915e5b99762c4d0
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Apr 22 10:22:29 2024 +0800

    Fifo opt (#732)
    
    * Prepare to optimize FIFO publishing
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * fix: SendReceipt now contains std::unique_ptr<Message> being sent
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * fix: add doc explaining why we taking ownership of the message being sent
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * feat: implement FifoProducerPartition
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * feat: implement FifoProducerImpl
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * feat: implement builder for FifoProducer
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * fix: prepare to debug
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * fix: log sending sending stages
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    ---------
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/examples/CMakeLists.txt                        |  1 +
 ...oducerWithAsync.cpp => ExampleFifoProducer.cpp} | 64 ++++++++-------
 cpp/examples/ExampleProducerWithAsync.cpp          |  1 -
 cpp/examples/ExamplePushConsumer.cpp               |  1 -
 cpp/examples/ExampleSimpleConsumer.cpp             |  1 -
 cpp/include/rocketmq/Configuration.h               |  6 +-
 cpp/include/rocketmq/FifoProducer.h                | 52 +++++++++++++
 cpp/include/rocketmq/Producer.h                    |  5 +-
 cpp/include/rocketmq/SendReceipt.h                 |  7 +-
 cpp/source/base/Configuration.cpp                  |  4 +-
 cpp/source/client/ClientManagerImpl.cpp            | 88 ++++++++++-----------
 cpp/source/client/include/ClientManager.h          | 10 +--
 cpp/source/client/include/ClientManagerImpl.h      | 14 +---
 cpp/source/client/include/SendResult.h             | 17 ++++
 cpp/source/client/include/SendResultCallback.h     | 11 +++
 cpp/source/rocketmq/FifoContext.cpp                | 16 ++++
 cpp/source/rocketmq/FifoProducer.cpp               | 56 +++++++++++++
 cpp/source/rocketmq/FifoProducerImpl.cpp           | 21 +++++
 cpp/source/rocketmq/FifoProducerPartition.cpp      | 91 ++++++++++++++++++++++
 cpp/source/rocketmq/Producer.cpp                   |  4 -
 cpp/source/rocketmq/ProducerImpl.cpp               | 47 ++++++-----
 cpp/source/rocketmq/SendContext.cpp                | 53 ++++++-------
 cpp/source/rocketmq/include/ClientImpl.h           |  4 +-
 cpp/source/rocketmq/include/FifoContext.h          | 18 +++++
 cpp/source/rocketmq/include/FifoProducerImpl.h     | 37 +++++++++
 .../rocketmq/include/FifoProducerPartition.h       | 39 ++++++++++
 cpp/source/rocketmq/include/ProducerImpl.h         | 28 +++++--
 cpp/source/rocketmq/include/SendContext.h          | 10 +--
 cpp/tools/trouble_shooting.sh                      |  0
 29 files changed, 537 insertions(+), 169 deletions(-)

diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8d6b0399..27304477 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -4,6 +4,7 @@ function(add_example name file)
 endfunction()
 
 add_example(example_producer ExampleProducer.cpp)
+add_example(example_fifo_producer ExampleFifoProducer.cpp)
 add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
 add_example(example_producer_with_fifo_message 
ExampleProducerWithFifoMessage.cpp)
 add_example(example_producer_with_timed_message 
ExampleProducerWithTimedMessage.cpp)
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleFifoProducer.cpp
similarity index 77%
copy from cpp/examples/ExampleProducerWithAsync.cpp
copy to cpp/examples/ExampleFifoProducer.cpp
index 63b7611c..9d99be36 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -17,16 +17,19 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
-#include <cstdint>
 #include <iostream>
-#include <mutex>
+#include <memory>
 #include <random>
 #include <string>
 #include <system_error>
 
 #include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/FifoProducer.h"
+#include "rocketmq/Logger.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
+#include "rocketmq/SendReceipt.h"
 
 using namespace ROCKETMQ_NAMESPACE;
 
@@ -94,31 +97,33 @@ DEFINE_string(topic, "standard_topic_sample", "Topic to 
which messages are publi
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
-DEFINE_uint32(concurrency, 128, "Concurrency of async send");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
 DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
+DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
   auto& logger = getLogger();
-  logger.setConsoleLevel(Level::Info);
-  logger.setLevel(Level::Info);
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
   logger.init();
 
+  // Access Key/Secret pair may be acquired from management console
   CredentialsProviderPtr credentials_provider;
   if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
     credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
   }
 
-  // In most case, you don't need to create too many producers, singletion 
pattern is recommended.
-  auto producer = Producer::newBuilder()
+  // In most case, you don't need to create too many producers, singleton 
pattern is recommended.
+  auto producer = FifoProducer::newBuilder()
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
                                              .withSsl(FLAGS_tls)
                                              .build())
+                      .withConcurrency(FLAGS_concurrency)
                       .withTopics({FLAGS_topic})
                       .build();
 
@@ -146,25 +151,32 @@ int main(int argc, char* argv[]) {
 
   std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency));
 
-  auto send_callback = [&](const std::error_code& ec, const SendReceipt& 
receipt) {
-    std::unique_lock<std::mutex> lk(mtx);
-    semaphore->release();
-    completed++;
-    count++;
-    if (completed >= FLAGS_total) {
-      cv.notify_all();
+  try {
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
+      auto message = Message::newBuilder()
+                         .withTopic(FLAGS_topic)
+                         .withTag("TagA")
+                         .withKeys({"Key-" + std::to_string(i)})
+                         .withGroup("message-group" + std::to_string(i % 
FLAGS_concurrency))
+                         .withBody(body)
+                         .build();
+      std::error_code ec;
+      auto callback = [&](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
+        completed++;
+        count++;
+        semaphore->release();
+
+        if (completed >= FLAGS_total) {
+          cv.notify_all();
+        }
+      };
+
+      semaphore->acquire();
+      producer.send(std::move(message), callback);
+      std::cout << "Cached No." << i << " message" << std::endl;
     }
-  };
-
-  for (std::size_t i = 0; i < FLAGS_total; ++i) {
-    auto message = Message::newBuilder()
-                       .withTopic(FLAGS_topic)
-                       .withTag("TagA")
-                       .withKeys({"Key-" + std::to_string(i)})
-                       .withBody(body)
-                       .build();
-    semaphore->acquire();
-    producer.send(std::move(message), send_callback);
+  } catch (...) {
+    std::cerr << "Ah...No!!!" << std::endl;
   }
 
   {
@@ -179,4 +191,4 @@ int main(int argc, char* argv[]) {
   }
 
   return EXIT_SUCCESS;
-}
\ No newline at end of file
+}
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleProducerWithAsync.cpp
index 63b7611c..d88dfc85 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -17,7 +17,6 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
-#include <cstdint>
 #include <iostream>
 #include <mutex>
 #include <random>
diff --git a/cpp/examples/ExamplePushConsumer.cpp 
b/cpp/examples/ExamplePushConsumer.cpp
index ab106cb7..66a85f4b 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -16,7 +16,6 @@
  */
 #include <chrono>
 #include <iostream>
-#include <mutex>
 #include <thread>
 
 #include "gflags/gflags.h"
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index 17a84b78..aedec71e 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -16,7 +16,6 @@
  */
 #include <chrono>
 #include <iostream>
-#include <thread>
 
 #include "gflags/gflags.h"
 #include "rocketmq/Logger.h"
diff --git a/cpp/include/rocketmq/Configuration.h 
b/cpp/include/rocketmq/Configuration.h
index 0037c270..6dcd4137 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@ public:
   }
 
   bool withSsl() const {
-    return withSsl_;
+    return tls_;
   }
 
 protected:
@@ -56,7 +56,7 @@ private:
   std::string               endpoints_;
   CredentialsProviderPtr    credentials_provider_;
   std::chrono::milliseconds 
request_timeout_{ConfigurationDefaults::RequestTimeout};
-  bool withSsl_ = true;
+  bool tls_ = true;
 };
 
 class ConfigurationBuilder {
@@ -67,7 +67,7 @@ public:
 
   ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds 
request_timeout);
 
-  ConfigurationBuilder& withSsl(bool enable);
+  ConfigurationBuilder& withSsl(bool with_ssl);
 
   Configuration build();
 
diff --git a/cpp/include/rocketmq/FifoProducer.h 
b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 00000000..fbf76620
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+  static FifoProducerBuilder newBuilder();
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+
+  explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) : 
impl_(std::move(impl)) {
+  }
+
+  void start();
+
+  friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+  FifoProducerBuilder();
+
+  FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+  FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+  FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+  FifoProducer build();
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+  std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb3..6b42843d 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
  */
 #pragma once
 
-#include <chrono>
-#include <functional>
 #include <memory>
 #include <system_error>
 #include <vector>
 
 #include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
 #include "Message.h"
 #include "SendCallback.h"
 #include "SendReceipt.h"
 #include "Transaction.h"
 #include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/include/rocketmq/SendReceipt.h
index 489df5ec..7eef6e79 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -16,20 +16,21 @@
  */
 #pragma once
 
-#include <cstdint>
 #include <string>
-#include <utility>
 
 #include "RocketMQ.h"
+#include "rocketmq/Message.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
 struct SendReceipt {
+  std::string target;
+
   std::string message_id;
 
   std::string transaction_id;
 
-  std::string target;
+  MessageConstPtr message;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp 
b/cpp/source/base/Configuration.cpp
index 2a136d5d..66cff2e8 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@ ConfigurationBuilder& 
ConfigurationBuilder::withRequestTimeout(std::chrono::mill
   return *this;
 }
 
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
-  configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+  configuration_.tls_ = with_ssl;
   return *this;
 }
 
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 643d3741..7d724c7b 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -27,32 +27,27 @@
 #include "InvocationContext.h"
 #include "LogInterceptor.h"
 #include "LogInterceptorFactory.h"
-#include "MessageExt.h"
-#include "MetadataConstants.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "ReceiveMessageContext.h"
 #include "RpcClient.h"
 #include "RpcClientImpl.h"
 #include "Scheduler.h"
-#include "TlsHelper.h"
+#include "SchedulerImpl.h"
 #include "UtilAll.h"
-#include "apache/rocketmq/v2/definition.pb.h"
 #include "google/protobuf/util/time_util.h"
 #include "grpcpp/create_channel.h"
 #include "rocketmq/ErrorCode.h"
-#include "rocketmq/Logger.h"
-#include "rocketmq/SendReceipt.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
withSsl)
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
with_ssl)
     : scheduler_(std::make_shared<SchedulerImpl>()),
       resource_namespace_(std::move(resource_namespace)),
       state_(State::CREATED),
       
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
-      withSsl_(withSsl) {
+      with_ssl_(with_ssl) {
   certificate_verifier_ = 
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
   tls_channel_credential_options_.set_verify_server_certs(false);
   tls_channel_credential_options_.set_check_call_host(false);
@@ -285,7 +280,7 @@ void ClientManagerImpl::doHeartbeat() {
 bool ClientManagerImpl::send(const std::string& target_host,
                              const Metadata& metadata,
                              SendMessageRequest& request,
-                             SendCallback cb) {
+                             SendResultCallback cb) {
   assert(cb);
   SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", 
target_host, request.DebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -311,15 +306,14 @@ bool ClientManagerImpl::send(const std::string& 
target_host,
       return;
     }
 
-    SendReceipt send_receipt = {};
-    send_receipt.target = target_host;
-    std::error_code ec;
+    SendResult send_result = {};
+    send_result.target = target_host;
     if (!invocation_context->status.ok()) {
       SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: 
{}, gRPC error message: {}",
                   invocation_context->remote_address, 
invocation_context->status.error_code(),
                   invocation_context->status.error_message());
-      ec = ErrorCode::RequestTimeout;
-      cb(ec, send_receipt);
+      send_result.ec = ErrorCode::RequestTimeout;
+      cb(send_result);
       return;
     }
 
@@ -328,8 +322,8 @@ bool ClientManagerImpl::send(const std::string& target_host,
       case rmq::Code::OK: {
         if (!invocation_context->response.entries().empty()) {
           auto first = invocation_context->response.entries().begin();
-          send_receipt.message_id = first->message_id();
-          send_receipt.transaction_id = first->transaction_id();
+          send_result.message_id = first->message_id();
+          send_result.transaction_id = first->transaction_id();
         } else {
           SPDLOG_ERROR("Unexpected send-message-response: {}", 
invocation_context->response.DebugString());
         }
@@ -338,127 +332,127 @@ bool ClientManagerImpl::send(const std::string& 
target_host,
 
       case rmq::Code::ILLEGAL_TOPIC: {
         SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalTopic;
+        send_result.ec = ErrorCode::IllegalTopic;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_TAG: {
         SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageTag;
+        send_result.ec = ErrorCode::IllegalMessageTag;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_KEY: {
         SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageKey;
+        send_result.ec = ErrorCode::IllegalMessageKey;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
         SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageGroup;
+        send_result.ec = ErrorCode::IllegalMessageGroup;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
         SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageProperty;
+        send_result.ec = ErrorCode::IllegalMessageProperty;
         break;
       }
 
       case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
         SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", 
status.message(), invocation_context->remote_address);
-        ec = ErrorCode::MessagePropertiesTooLarge;
+        send_result.ec = ErrorCode::MessagePropertiesTooLarge;
         break;
       }
 
       case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
         SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::MessageBodyTooLarge;
+        send_result.ec = ErrorCode::MessageBodyTooLarge;
         break;
       }
 
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::TopicNotFound;
+        send_result.ec = ErrorCode::TopicNotFound;
         break;
       }
 
       case rmq::Code::NOT_FOUND: {
         SPDLOG_WARN("NotFound: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::NotFound;
+        send_result.ec = ErrorCode::NotFound;
         break;
       }
 
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::Unauthorized;
+        send_result.ec = ErrorCode::Unauthorized;
         break;
       }
 
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
+        send_result.ec = ErrorCode::Forbidden;
         break;
       }
 
       case rmq::Code::MESSAGE_CORRUPTED: {
         SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::MessageCorrupted;
+        send_result.ec = ErrorCode::MessageCorrupted;
         break;
       }
 
       case rmq::Code::TOO_MANY_REQUESTS: {
         SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::TooManyRequests;
+        send_result.ec = ErrorCode::TooManyRequests;
         break;
       }
 
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::InternalServerError;
+        send_result.ec = ErrorCode::InternalServerError;
         break;
       }
 
       case rmq::Code::HA_NOT_AVAILABLE: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::InternalServerError;
+        send_result.ec = ErrorCode::InternalServerError;
         break;
       }
 
       case rmq::Code::PROXY_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
         SPDLOG_WARN("Message-property-conflict-with-type: Host={}, 
Response={}", invocation_context->remote_address,
                     invocation_context->response.DebugString());
-        ec = ErrorCode::MessagePropertyConflictWithType;
+        send_result.ec = ErrorCode::MessagePropertyConflictWithType;
         break;
       }
 
       default: {
         SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. 
Host={}", invocation_context->remote_address);
-        ec = ErrorCode::NotSupported;
+        send_result.ec = ErrorCode::NotSupported;
         break;
       }
     }
 
-    cb(ec, send_receipt);
+    cb(send_result);
   };
 
   invocation_context->callback = completion_callback;
@@ -476,7 +470,7 @@ std::shared_ptr<grpc::Channel> 
ClientManagerImpl::createChannel(const std::strin
   
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
 interceptor_factories;
   
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
   auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-      target_host, withSsl_ ? channel_credential_ : 
grpc::InsecureChannelCredentials(), channel_arguments_,
+      target_host, with_ssl_ ? channel_credential_ : 
grpc::InsecureChannelCredentials(), channel_arguments_,
       std::move(interceptor_factories));
   return channel;
 }
@@ -520,28 +514,28 @@ void ClientManagerImpl::cleanRpcClients() {
   rpc_clients_.clear();
 }
 
-SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& 
message_queue,
-                                                   const SendMessageResponse& 
response,
-                                                   std::error_code& ec) {
-  SendReceipt send_receipt;
+SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& 
message_queue,
+                                                  const SendMessageResponse& 
response,
+                                                  std::error_code& ec) {
+  SendResult send_result;
 
   switch (response.status().code()) {
     case rmq::Code::OK: {
       assert(response.entries_size() > 0);
-      send_receipt.message_id = response.entries().begin()->message_id();
-      send_receipt.transaction_id = 
response.entries().begin()->transaction_id();
-      return send_receipt;
+      send_result.message_id = response.entries().begin()->message_id();
+      send_result.transaction_id = 
response.entries().begin()->transaction_id();
+      return send_result;
     }
     case rmq::Code::ILLEGAL_TOPIC: {
       ec = ErrorCode::BadRequest;
-      return send_receipt;
+      return send_result;
     }
     default: {
       // TODO: handle other cases.
       break;
     }
   }
-  return send_receipt;
+  return send_result;
 }
 
 void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
diff --git a/cpp/source/client/include/ClientManager.h 
b/cpp/source/client/include/ClientManager.h
index 56325fa4..02b232b2 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -22,14 +22,12 @@
 #include <system_error>
 
 #include "Client.h"
-#include "MessageExt.h"
 #include "Metadata.h"
 #include "ReceiveMessageCallback.h"
 #include "RpcClient.h"
 #include "Scheduler.h"
-#include "TelemetryBidiReactor.h"
+#include "SendResultCallback.h"
 #include "TopicRouteData.h"
-#include "rocketmq/SendCallback.h"
 #include "rocketmq/State.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -93,8 +91,10 @@ public:
   virtual void receiveMessage(const std::string& target, const Metadata& 
metadata, const ReceiveMessageRequest& request,
                               std::chrono::milliseconds timeout, 
ReceiveMessageCallback callback) = 0;
 
-  virtual bool send(const std::string& target_host, const Metadata& metadata, 
SendMessageRequest& request,
-                    SendCallback cb) = 0;
+  virtual bool send(const std::string& target_host,
+                    const Metadata& metadata,
+                    SendMessageRequest& request,
+                    SendResultCallback cb) = 0;
 
   virtual std::error_code notifyClientTermination(const std::string& 
target_host, const Metadata& metadata,
                                                   const 
NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/ClientManagerImpl.h 
b/cpp/source/client/include/ClientManagerImpl.h
index 653fcad3..5f1b27ca 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -20,7 +20,6 @@
 #include <chrono>
 #include <cstdint>
 #include <functional>
-#include <future>
 #include <memory>
 #include <string>
 #include <system_error>
@@ -29,18 +28,13 @@
 #include "Client.h"
 #include "ClientManager.h"
 #include "InsecureCertificateVerifier.h"
-#include "InvocationContext.h"
 #include "ReceiveMessageCallback.h"
 #include "RpcClientImpl.h"
-#include "SchedulerImpl.h"
-#include "SendMessageContext.h"
-#include "TelemetryBidiReactor.h"
 #include "ThreadPoolImpl.h"
 #include "TopicRouteData.h"
 #include "absl/base/thread_annotations.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "absl/synchronization/mutex.h"
 #include "rocketmq/State.h"
 
@@ -54,7 +48,7 @@ public:
    * effectively.
    * @param resource_namespace Abstract resource namespace, in which this 
client manager lives.
    */
-  explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = 
true);
+  explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = 
true);
 
   ~ClientManagerImpl() override;
 
@@ -89,7 +83,7 @@ public:
   bool send(const std::string& target_host,
             const Metadata& metadata,
             SendMessageRequest& request,
-            SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
+            SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
 
   /**
    * Get a RpcClient according to the given target hosts, which follows scheme 
specified
@@ -105,7 +99,7 @@ public:
   RpcClientSharedPtr getRpcClient(const std::string& target_host, bool 
need_heartbeat = true) override
       LOCKS_EXCLUDED(rpc_clients_mtx_);
 
-  static SendReceipt processSendResponse(const rmq::MessageQueue& 
message_queue,
+  static SendResult processSendResponse(const rmq::MessageQueue& message_queue,
                                          const SendMessageResponse& response,
                                          std::error_code& ec);
 
@@ -242,7 +236,7 @@ private:
   grpc::ChannelArguments channel_arguments_;
 
   bool trace_{false};
-  bool withSsl_;
+  bool with_ssl_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SendResult.h 
b/cpp/source/client/include/SendResult.h
new file mode 100644
index 00000000..d5f85277
--- /dev/null
+++ b/cpp/source/client/include/SendResult.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <system_error>
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct SendResult {
+  std::error_code ec;
+  std::string target;
+
+  std::string message_id;
+  std::string transaction_id;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SendResultCallback.h 
b/cpp/source/client/include/SendResultCallback.h
new file mode 100644
index 00000000..3dd5bda7
--- /dev/null
+++ b/cpp/source/client/include/SendResultCallback.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <functional>
+
+#include "SendResult.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+using SendResultCallback = std::function<void(const SendResult&)>;
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoContext.cpp 
b/cpp/source/rocketmq/FifoContext.cpp
new file mode 100644
index 00000000..f1affd16
--- /dev/null
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -0,0 +1,16 @@
+#include "FifoContext.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+    : message(std::move(message)), callback(callback) {
+}
+
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+  this->message = std::move(rhs.message);
+  this->callback = rhs.callback;
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducer.cpp 
b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 00000000..da43b589
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,56 @@
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+  return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() : 
producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration 
configuration) {
+  auto name_server_resolver = 
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+  producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+  producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+  producer_impl_->withRequestTimeout(configuration.requestTimeout());
+  producer_impl_->withSsl(configuration.withSsl());
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const 
std::vector<std::string>& topics) {
+  producer_impl_->withTopics(topics);
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t 
concurrency) {
+  this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_, 
concurrency);
+  return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+  FifoProducer fifo_producer(this->impl_);
+  fifo_producer.start();
+  return fifo_producer;
+}
+
+void FifoProducer::start() {
+  impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+  impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/FifoProducerImpl.cpp 
b/cpp/source/rocketmq/FifoProducerImpl.cpp
new file mode 100644
index 00000000..2b33345c
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -0,0 +1,21 @@
+#include "FifoProducerImpl.h"
+
+#include <utility>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+  auto& group = message->group();
+  std::size_t hash = hash_fn_(group);
+  std::size_t slot = hash % concurrency_;
+
+  FifoContext context(std::move(message), callback);
+  partitions_[slot]->add(std::move(context));
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp 
b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 00000000..94e1c722
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,91 @@
+#include "FifoProducerPartition.h"
+
+#include <absl/synchronization/mutex.h>
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_back(std::move(context));
+    SPDLOG_DEBUG("{} has {} pending messages after #add", name_, 
messages_.size());
+  }
+
+  trySend();
+}
+
+void FifoProducerPartition::trySend() {
+  bool expected = false;
+  if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+    absl::MutexLock lk(&messages_mtx_);
+
+    if (messages_.empty()) {
+      SPDLOG_DEBUG("There is no more messages to send");
+      return;
+    }
+
+    FifoContext& ctx = messages_.front();
+    MessageConstPtr message = std::move(ctx.message);
+    SendCallback send_callback = ctx.callback;
+
+    std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+    auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
+      partition->onComplete(ec, receipt, send_callback);
+    };
+    SPDLOG_DEBUG("Sending FIFO message from {}", name_);
+    producer_->send(std::move(message), fifo_callback);
+    messages_.pop_front();
+    SPDLOG_DEBUG("In addition to the inflight one, there is {} messages 
pending in {}", messages_.size(), name_);
+  } else {
+    SPDLOG_DEBUG("There is an inflight message");
+  }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const 
SendReceipt& receipt, SendCallback& callback) {
+  if (ec) {
+    SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+  } else {
+    SPDLOG_DEBUG("{} completed OK", name_);
+  }
+
+  if (!ec) {
+    callback(ec, receipt);
+    // update inflight status
+    bool expected = true;
+    if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+      trySend();
+    } else {
+      SPDLOG_ERROR("{}: Unexpected inflight status", name_);
+    }
+    return;
+  }
+
+  // Put the message back to the front of the list
+  SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+  FifoContext retry_context(std::move(receipt_mut.message), callback);
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_front(std::move(retry_context));
+  }
+
+  // Update inflight status
+  bool expected = true;
+  if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+    trySend();
+  } else {
+    SPDLOG_ERROR("Unexpected inflight status");
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812ed..907d0a28 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
 #include <system_error>
 #include <utility>
 
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
 #include "ProducerImpl.h"
 #include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 73130161..34975e71 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,38 +17,27 @@
 #include "ProducerImpl.h"
 
 #include <algorithm>
-#include <apache/rocketmq/v2/definition.pb.h>
-
 #include <atomic>
 #include <cassert>
 #include <chrono>
-#include <limits>
 #include <memory>
 #include <system_error>
 #include <utility>
 
-#include "Client.h"
-#include "MessageGroupQueueSelector.h"
-#include "MetadataConstants.h"
+#include "apache/rocketmq/v2/definition.pb.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "PublishInfoCallback.h"
-#include "RpcClient.h"
 #include "SendContext.h"
-#include "SendMessageContext.h"
 #include "Signature.h"
-#include "Tag.h"
 #include "TracingUtility.h"
 #include "TransactionImpl.h"
-#include "UniqueIdGenerator.h"
 #include "UtilAll.h"
-#include "absl/strings/str_join.h"
 #include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/Tracing.h"
 #include "rocketmq/Transaction.h"
 #include "rocketmq/TransactionChecker.h"
 
@@ -203,19 +192,28 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
 SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) 
noexcept {
   ensureRunning(ec);
   if (ec) {
-    return {};
+    SPDLOG_WARN("Producer is not running");
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto topic_publish_info = getPublishInfo(message->topic());
   if (!topic_publish_info) {
+    SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   std::vector<rmq::MessageQueue> message_queue_list;
   if 
(!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), 
message_queue_list)) {
+    SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", 
message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto mtx = std::make_shared<absl::Mutex>();
@@ -224,9 +222,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   SendReceipt   send_receipt;
 
   // Define callback
-  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) {
+  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) mutable {
     ec = code;
-    send_receipt = receipt;
+    SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+    send_receipt.message = std::move(receipt_mut.message);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
@@ -251,6 +250,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
   ensureRunning(ec);
   if (ec) {
     SendReceipt send_receipt;
+    send_receipt.message = std::move(message);
     cb(ec, send_receipt);
   }
 
@@ -264,6 +264,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     // No route entries of the given topic is available
     if (ec) {
       SendReceipt send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -271,6 +272,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     if (!publish_info) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -280,6 +282,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -338,12 +341,12 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> 
context) {
   Metadata metadata;
   Signature::sign(client_config_, metadata);
 
-  auto callback = [context](const std::error_code& ec, const SendReceipt& 
send_receipt) {
-    if (ec) {
-      context->onFailure(ec);
+  auto callback = [context](const SendResult& send_result) {
+    if (send_result.ec) {
+      context->onFailure(send_result.ec);
       return;
     }
-    context->onSuccess(send_receipt);
+    context->onSuccess(send_result);
   };
 
   client_manager_->send(target, metadata, request, callback);
@@ -354,12 +357,14 @@ void ProducerImpl::send0(MessageConstPtr message, 
SendCallback callback, std::ve
   std::error_code ec;
   validate(*message, ec);
   if (ec) {
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
 
   if (list.empty()) {
     ec = ErrorCode::NotFound;
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
diff --git a/cpp/source/rocketmq/SendContext.cpp 
b/cpp/source/rocketmq/SendContext.cpp
index 385a1a99..bd97384d 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,41 +21,44 @@
 #include "ProducerImpl.h"
 #include "PublishStats.h"
 #include "Tag.h"
-#include "TransactionImpl.h"
-#include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
-#include "rocketmq/Logger.h"
+#include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
+void SendContext::onSuccess(const SendResult& send_result) noexcept {
   {
     // Mark end of send-message span.
     span_.SetStatus(opencensus::trace::StatusCode::OK);
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), 
MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), 
MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), 
publisher->config().client_id},
+                                  {Tag::clientIdTag(), 
producer->config().client_id},
                                   {Tag::invocationStatusTag(), "success"},
                               });
   }
 
   // 
send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
-  std::error_code ec;
-  callback_(ec, send_receipt);
+  SendReceipt send_receipt = {};
+  send_receipt.target = send_result.target;
+  send_receipt.message_id = send_result.message_id;
+  send_receipt.transaction_id = send_result.transaction_id;
+  send_receipt.message = std::move(message_);
+  callback_(send_result.ec, send_receipt);
 }
 
 void SendContext::onFailure(const std::error_code& ec) noexcept {
@@ -65,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec) 
noexcept {
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), 
MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), 
MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), 
publisher->config().client_id},
+                                  {Tag::clientIdTag(), 
producer->config().client_id},
                                   {Tag::invocationStatusTag(), "failure"},
                               });
   }
 
-  if (++attempt_times_ >= publisher->maxAttemptTimes()) {
-    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", 
attempt_times_, publisher->maxAttemptTimes());
-    callback_(ec, {});
-    return;
-  }
-
-  std::shared_ptr<ProducerImpl> producer = producer_.lock();
-  if (!producer) {
-    SPDLOG_WARN("Producer has been destructed");
-    callback_(ec, {});
+  if (++attempt_times_ >= producer->maxAttemptTimes()) {
+    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", 
attempt_times_, producer->maxAttemptTimes());
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
   if (candidates_.empty()) {
     SPDLOG_WARN("No alternative hosts to perform additional retries");
-    callback_(ec, {});
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
@@ -106,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec) 
noexcept {
   auto ctx = shared_from_this();
   // If publish message requests are throttled, retry after backoff
   if (ErrorCode::TooManyRequests == ec) {
-    auto&& backoff = publisher->backoff(attempt_times_);
+    auto&& backoff = producer->backoff(attempt_times_);
     SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry 
after {}ms", message_->topic(),
                  message_->id(), MixAll::millisecondsOf(backoff));
     auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index c266047a..d7693962 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@ public:
     client_config_.request_timeout = absl::FromChrono(request_timeout);
   }
 
-  void withSsl(bool enable) {
-    client_config_.withSsl = enable;
+  void withSsl(bool with_ssl) {
+    client_config_.withSsl = with_ssl;
   }
 
   /**
diff --git a/cpp/source/rocketmq/include/FifoContext.h 
b/cpp/source/rocketmq/include/FifoContext.h
new file mode 100644
index 00000000..55812ba9
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+struct FifoContext {
+  MessageConstPtr message;
+  SendCallback callback;
+
+  FifoContext(MessageConstPtr message, SendCallback callback);
+
+  FifoContext(FifoContext&& rhs) noexcept;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h 
b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 00000000..180c3f93
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
+#include "ProducerImpl.h"
+#include "fmt/format.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+  FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t 
concurrency)
+      : producer_(producer), concurrency_(concurrency), 
partitions_(concurrency) {
+    for (auto i = 0; i < concurrency; i++) {
+      partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, 
fmt::format("slot-{}", i));
+    }
+  };
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+  std::shared_ptr<ProducerImpl>& internalProducer() {
+    return producer_;
+  }
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+  std::size_t concurrency_;
+  std::hash<std::string> hash_fn_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h 
b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 00000000..96bb96f6
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <absl/base/internal/thread_annotations.h>
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public 
std::enable_shared_from_this<FifoProducerPartition> {
+public:
+  FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& 
name)
+      : producer_(producer), name_(std::move(name)) {
+  }
+
+  void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+  void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+  void onComplete(const std::error_code& ec, const SendReceipt& receipt, 
SendCallback& callback);
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+  absl::Mutex messages_mtx_;
+  std::atomic_bool inflight_{false};
+  std::string name_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a93..b572f20d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
  */
 #pragma once
 
-#include <chrono>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <system_error>
 
 #include "ClientImpl.h"
-#include "ClientManagerImpl.h"
 #include "MixAll.h"
 #include "PublishInfoCallback.h"
+#include "PublishStats.h"
 #include "SendContext.h"
 #include "TopicPublishInfo.h"
 #include "TransactionImpl.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
 #include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -53,8 +48,22 @@ public:
 
   void shutdown() override;
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
+   * sent.
+   *
+   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
+   * application to conduct customized retry policy.
+   */
   SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
+   * sent.
+   *
+   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
+   * application to conduct customized retry policy.
+   */
   void send(MessageConstPtr message, SendCallback callback);
 
   void setTransactionChecker(TransactionChecker checker);
@@ -64,6 +73,13 @@ public:
     return absl::make_unique<TransactionImpl>(producer);
   }
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
+   * sent.
+   *
+   * TODO: Refine this API. Current API is not good enough as it cannot handle 
the message back to its caller on publish
+   * failure.
+   */
   void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
   /**
diff --git a/cpp/source/rocketmq/include/SendContext.h 
b/cpp/source/rocketmq/include/SendContext.h
index 4c05cebe..4067532b 100644
--- a/cpp/source/rocketmq/include/SendContext.h
+++ b/cpp/source/rocketmq/include/SendContext.h
@@ -19,16 +19,12 @@
 #include <memory>
 #include <system_error>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-#include "opencensus/trace/span.h"
-
 #include "Protocol.h"
+#include "SendResult.h"
 #include "TransactionImpl.h"
-#include "rocketmq/ErrorCode.h"
+#include "opencensus/trace/span.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
-#include "rocketmq/SendReceipt.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -47,7 +43,7 @@ public:
         span_(opencensus::trace::Span::BlankSpan()) {
   }
 
-  void onSuccess(const SendReceipt& send_receipt) noexcept;
+  void onSuccess(const SendResult& send_result) noexcept;
 
   void onFailure(const std::error_code& ec) noexcept;
 
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755

Reply via email to