This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 739f754  Prepare to sync examples (#66)
739f754 is described below

commit 739f754cd04a79f97dbbdb1bc2b8e09064c33878
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Jul 21 09:26:40 2022 +0800

    Prepare to sync examples (#66)
    
    * Prepare to sync examples
    
    * Make topic, service access point, message body size configurable through 
command option
    
    * Add an example producer, publishing timed message
    
    * Add transactional example
    
    * Update README
---
 cpp/README.md                                      |  41 ++++-
 cpp/bazel/rocketmq_deps.bzl                        | 189 +++++++++++----------
 cpp/examples/BUILD.bazel                           | 105 ++++--------
 cpp/examples/BenchmarkPushConsumer.cpp             |  83 ---------
 cpp/examples/ExampleFifoPushConsumer.cpp           |  67 --------
 cpp/examples/ExampleProducer.cpp                   |  38 +++--
 ...ncProducer.cpp => ExampleProducerWithAsync.cpp} |  32 ++--
 ...ucer.cpp => ExampleProducerWithFifoMessage.cpp} |  29 +++-
 ...cer.cpp => ExampleProducerWithTimedMessage.cpp} |  38 +++--
 ...=> ExampleProducerWithTransactionalMessage.cpp} |  65 +++----
 cpp/examples/ExampleTransactionProducer.cpp        |  52 ------
 cpp/examples/PushConsumerWithCustomExecutor.cpp    | 147 ----------------
 cpp/examples/PushConsumerWithThrottle.cpp          |  88 ----------
 cpp/examples/SqlConsumer.cpp                       |  61 -------
 cpp/examples/SqlProducer.cpp                       |  60 -------
 cpp/include/rocketmq/Message.h                     |   9 +
 cpp/include/rocketmq/Producer.h                    |   5 +
 cpp/include/rocketmq/SendReceipt.h                 |   2 +
 cpp/include/rocketmq/Transaction.h                 |  10 --
 cpp/source/base/Message.cpp                        |   5 +
 cpp/source/base/ThreadPoolImpl.cpp                 |   2 +-
 cpp/source/client/ClientManagerImpl.cpp            |  17 +-
 cpp/source/client/TelemetryBidiReactor.cpp         |   5 +-
 cpp/source/rocketmq/Producer.cpp                   |   9 +
 cpp/source/rocketmq/ProducerImpl.cpp               |  79 +++++----
 cpp/source/rocketmq/TransactionImpl.cpp            |  25 ++-
 cpp/source/rocketmq/include/ProducerImpl.h         |  17 +-
 cpp/source/rocketmq/include/TransactionImpl.h      |  47 ++---
 cpp/source/scheduler/SchedulerImpl.cpp             |   2 +-
 29 files changed, 432 insertions(+), 897 deletions(-)

diff --git a/cpp/README.md b/cpp/README.md
index 705abe2..fa92392 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -48,16 +48,53 @@ if "com_google_googletest" not in native.existing_rules():
 
 
 1. Build
-   From the repository root, 
+   From the workspace, 
    ```
    bazel build //...
    ```
 2. Run Unit Tests
-   From the repository root,
+   From the workspace,
    ```
    bazel test //...
    ```
 
+3. Run Examples
+   From the workspace,
+
+   Publish standard messages to your topic synchronously
+   ```
+   bazel run //examples:example_producer -- --topic=YOUR_TOPIC 
--access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16
+   ```
+   where `1024` are in bytes
+
+   ------------
+
+   
+   Publish standard messages to your topic asynchronously
+   ```
+   bazel run //examples:example_producer_with_async -- --topic=YOUR_TOPIC 
--access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16
+   ```
+   where `1024` are in bytes
+
+   ------------
+
+
+   Publish FIFO messages to your topic
+   ```
+   bazel run //examples:example_producer_with_fifo_message -- 
--topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024 
--total=16
+   ```
+   where `1024` are in bytes
+
+   -----------
+
+   Publish Transactional messages
+   ```
+   bazel run //examples:example_producer_with_transactional_message -- 
--topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024 
--total=16
+   ```
+   where `1024` are in bytes
+
+   
+
 ### IDE
 [Visual Studio Code](https://code.visualstudio.com/) + 
[Clangd](https://clangd.llvm.org/) is the recommended development toolset. 
 1. VSCode + Clangd
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index 939b97c..9306f94 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -8,86 +8,75 @@ def rocketmq_deps():
         name = "opentelementry_api",
         actual = "@com_github_opentelemetry//api:api",
     )
+    
+    maybe(
+        http_archive,
+        name = "com_google_googletest",
+        sha256 = 
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
+        strip_prefix = "googletest-release-1.11.0",
+        urls = [
+        
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz";,
+        
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz";,
+        ],
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_gulrak_filesystem",
+        strip_prefix = "filesystem-1.5.0",
+        sha256 = 
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz";,
+            "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz";,
+        ],
+        build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_gabime_spdlog",
+        strip_prefix = "spdlog-1.9.2",
+        sha256 = 
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz";,
+            "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz";,
+        ],
+        build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_fmtlib_fmt",
+        strip_prefix = "fmt-8.0.1",
+        sha256 = 
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz";,
+            "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz";,
+        ],
+        build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
+    )
+
+    maybe(
+        http_archive,
+        name = "com_google_protobuf",
+        sha256 = 
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
+        strip_prefix = "protobuf-3.20.1",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz";,
+            
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz";,
+        ],
+    )
 
-    if "rules_python" not in native.existing_rules():
-        http_archive(
-            name = "rules_python",
-            sha256 = 
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
-            strip_prefix = "rules_python-0.8.1",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz";,
-                
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz";,
-            ],
-        )
-
-    if "com_google_googletest" not in native.existing_rules():
-         http_archive(
-             name = "com_google_googletest",
-             sha256 = 
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
-             strip_prefix = "googletest-release-1.11.0",
-             urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz";,
-                
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz";,
-             ],
-         )
-
-    if "com_github_gulrak_filesystem" not in native.existing_rules():
-        http_archive(
-            name = "com_github_gulrak_filesystem",
-            strip_prefix = "filesystem-1.5.0",
-            sha256 = 
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz";,
-                "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz";,
-            ],
-            build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
-        )
-
-    if "com_github_gabime_spdlog" not in native.existing_rules():
-        http_archive(
-            name = "com_github_gabime_spdlog",
-            strip_prefix = "spdlog-1.9.2",
-            sha256 = 
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz";,
-                
"https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz";,
-            ],
-            build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
-        )
-
-    if "com_github_fmtlib_fmt" not in native.existing_rules():
-        http_archive(
-            name = "com_github_fmtlib_fmt",
-            strip_prefix = "fmt-8.0.1",
-            sha256 = 
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz";,
-                "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz";,
-            ],
-            build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
-        )
-
-    if "com_google_protobuf" not in native.existing_rules():
-        http_archive(
-            name = "com_google_protobuf",
-            sha256 = 
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
-            strip_prefix = "protobuf-3.20.1",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz";,
-                
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz";,
-            ],
-        )
-
-    if "rules_proto_grpc" not in native.existing_rules():
-        http_archive(
-            name = "rules_proto_grpc",
-            sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
-            strip_prefix = "rules_proto_grpc-4.1.1",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz";,
-                
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz";
-            ],
-        )
+    maybe(
+        http_archive,
+        name = "rules_proto_grpc",
+        sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
+        strip_prefix = "rules_proto_grpc-4.1.1",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz";,
+            
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz";
+        ],
+    )
 
     maybe(
         http_archive,
@@ -100,16 +89,27 @@ def rocketmq_deps():
         strip_prefix = "opencensus-cpp-0.4.1",
     )
 
-    if "com_google_absl" not in native.existing_rules():
-        http_archive(
-            name = "com_google_absl",
-            sha256 = 
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
-            strip_prefix = "abseil-cpp-20211102.0",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz";,
-                
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz";,
-            ],
-        )
+    maybe(
+        http_archive,
+        name = "com_google_absl",
+        sha256 = 
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
+        strip_prefix = "abseil-cpp-20211102.0",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz";,
+            
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz";,
+        ],
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_gflags_gflags",
+        strip_prefix = "gflags-2.2.2",
+        sha256 = 
"34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz";,
+            "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz";,
+        ]
+    )
 
     maybe(
         http_archive,
@@ -145,6 +145,17 @@ def rocketmq_deps():
         strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
     )
 
+    maybe(
+        http_archive,
+        name = "rules_python",
+        sha256 = 
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
+        strip_prefix = "rules_python-0.8.1",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz";,
+            
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz";,
+        ],
+    )
+
     maybe(
         http_archive,
         name = "rules_swift",
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index c2dbab4..939f0a2 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -23,26 +23,51 @@ cc_binary(
     ],
     deps = [
         "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
     ],
 )
 
 cc_binary(
-    name = "example_fifo_producer",
+    name = "example_producer_with_fifo_message",
     srcs = [
-        "ExampleFifoProducer.cpp",
+        "ExampleProducerWithFifoMessage.cpp",
     ],
     deps = [
         "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
     ],
 )
 
 cc_binary(
-    name = "example_async_producer",
+    name = "example_producer_with_async",
     srcs = [
-        "ExampleAsyncProducer.cpp",
+        "ExampleProducerWithAsync.cpp",
     ],
     deps = [
         "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
+
+cc_binary(
+    name = "example_producer_with_timed_message",
+    srcs = [
+        "ExampleProducerWithTimedMessage.cpp",
+    ],
+    deps = [
+        "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
+    ],
+)
+
+cc_binary(
+    name = "example_producer_with_transactional_message",
+    srcs = [
+        "ExampleProducerWithTransactionalMessage.cpp",
+    ],
+    deps = [
+        "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
     ],
 )
 
@@ -56,16 +81,6 @@ cc_binary(
     ],
 )
 
-# cc_binary(
-#     name = "sql_producer",
-#     srcs = [
-#         "SqlProducer.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
 cc_binary(
     name = "example_push_consumer",
     srcs = [
@@ -74,64 +89,4 @@ cc_binary(
     deps = [
         "//source/rocketmq:rocketmq_library",
     ],
-)
-
-# cc_binary(
-#     name = "example_fifo_push_consumer",
-#     srcs = [
-#         "ExampleFifoPushConsumer.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
-# cc_binary(
-#     name = "push_consumer_with_custom_executor",
-#     srcs = [
-#         "PushConsumerWithCustomExecutor.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
-# cc_binary(
-#     name = "push_consumer_with_throttle",
-#     srcs = [
-#         "PushConsumerWithThrottle.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
-# cc_binary(
-#     name = "sql_consumer",
-#     srcs = [
-#         "SqlConsumer.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
-# cc_binary(
-#     name = "benchmark_push_consumer",
-#     srcs = [
-#         "BenchmarkPushConsumer.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
-# cc_binary(
-#     name = "example_transaction_producer",
-#     srcs = [
-#         "ExampleTransactionProducer.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
\ No newline at end of file
+)
\ No newline at end of file
diff --git a/cpp/examples/BenchmarkPushConsumer.cpp 
b/cpp/examples/BenchmarkPushConsumer.cpp
deleted file mode 100644
index 400fa80..0000000
--- a/cpp/examples/BenchmarkPushConsumer.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <atomic>
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-using namespace rocketmq;
-
-class CounterMessageListener : public StandardMessageListener {
-public:
-  explicit CounterMessageListener(std::atomic_long& counter) : 
counter_(counter) {
-  }
-
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    counter_.fetch_add(msgs.size());
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::atomic_long& counter_;
-};
-
-int main(int argc, char* argv[]) {
-
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  std::atomic_long counter(0);
-
-  DefaultMQPushConsumer push_consumer("CID_sample");
-  MessageListener* listener = new CounterMessageListener(counter);
-
-  push_consumer.setGroupName("CID_sample");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  push_consumer.subscribe("TopicTest", "*");
-  push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.start();
-
-  std::atomic_bool stopped(false);
-  std::thread report_thread([&counter, &stopped]() {
-    while (!stopped) {
-      std::this_thread::sleep_for(std::chrono::seconds(1));
-      long qps;
-      while (true) {
-        qps = counter.load(std::memory_order_relaxed);
-        if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
-          break;
-        }
-      }
-      std::cout << "QPS: " << qps << std::endl;
-    }
-  });
-
-  std::this_thread::sleep_for(std::chrono::minutes(30));
-  stopped.store(true);
-
-  if (report_thread.joinable()) {
-    report_thread.join();
-  }
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/ExampleFifoPushConsumer.cpp 
b/cpp/examples/ExampleFifoPushConsumer.cpp
deleted file mode 100644
index df039d7..0000000
--- a/cpp/examples/ExampleFifoPushConsumer.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-#include "rocketmq/Logger.h"
-
-#include "rocketmq/MessageListener.h"
-#include "spdlog/spdlog.h"
-
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-using namespace rocketmq;
-
-class SampleMQMessageListener : public FifoMessageListener {
-public:
-  ConsumeMessageResult consumeMessage(const MQMessageExt& message) override {
-    SPDLOG_INFO("Consume message[Topic={}, MessageId={}] OK", 
message.getTopic(), message.getMsgId());
-    std::cout << "Consume Message[MsgId=" << message.getMsgId() << "] OK. Body 
Size: " << message.getBody().size()
-              << std::endl;
-    // std::this_thread::sleep_for(std::chrono::seconds(1));
-    return ConsumeMessageResult::SUCCESS;
-  }
-};
-
-int main(int argc, char* argv[]) {
-
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  const char* group_id = "GID_lingchu_test_order";
-  const char* topic = "lingchu_test_order_topic";
-  const char* resource_namespace = "MQ_INST_1080056302921134_BXyTLppt";
-
-  DefaultMQPushConsumer push_consumer(group_id);
-  push_consumer.setResourceNamespace(resource_namespace);
-  
push_consumer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
-  push_consumer.setNamesrvAddr("120.25.100.131:8081");
-  FifoMessageListener* listener = new SampleMQMessageListener();
-  push_consumer.setInstanceName("instance_0");
-  push_consumer.subscribe(topic, "*");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.setConsumeThreadCount(4);
-  push_consumer.start();
-
-  std::this_thread::sleep_for(std::chrono::minutes(30));
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 9bdb9b4..b82cfce 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -18,8 +18,10 @@
 #include <atomic>
 #include <iostream>
 #include <random>
+#include <string>
 #include <system_error>
 
+#include "gflags/gflags.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
 
@@ -46,12 +48,17 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
+DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
 int main(int argc, char* argv[]) {
-  const char* topic = "lingchu_normal_topic";
-  const char* name_server = "121.196.167.124:8081";
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
 
-  auto producer =
-      
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+  auto producer = Producer::newBuilder()
+                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .build();
 
   std::atomic_bool stopped;
   std::atomic_long count(0);
@@ -69,16 +76,25 @@ int main(int argc, char* argv[]) {
 
   std::thread stats_thread(stats_lambda);
 
-  std::string body = randomString(1024 * 4);
-  std::cout << "Message body size: " << body.length() << std::endl;
+  std::string body = randomString(FLAGS_message_body_size);
 
   try {
-    for (int i = 0; i < 256; ++i) {
-      auto message = 
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
+      auto message = Message::newBuilder()
+                         .withTopic(FLAGS_topic)
+                         .withTag("TagA")
+                         .withKeys({"Key-" + std::to_string(i)})
+                         .withBody(body)
+                         .build();
       std::error_code ec;
       SendReceipt send_receipt = producer.send(std::move(message), ec);
-      std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
-      count++;
+      if (ec) {
+        std::cerr << "Failed to publish message to " << FLAGS_topic << ". 
Cause: " << ec.message() << std::endl;
+      } else {
+        std::cout << "Publish message to " << FLAGS_topic << " OK. Message-ID: 
" << send_receipt.message_id
+                  << std::endl;
+        count++;
+      }
     }
   } catch (...) {
     std::cerr << "Ah...No!!!" << std::endl;
@@ -88,7 +104,5 @@ int main(int argc, char* argv[]) {
     stats_thread.join();
   }
 
-  // std::this_thread::sleep_for(std::chrono::seconds(1));
-
   return EXIT_SUCCESS;
 }
\ No newline at end of file
diff --git a/cpp/examples/ExampleAsyncProducer.cpp 
b/cpp/examples/ExampleProducerWithAsync.cpp
similarity index 73%
copy from cpp/examples/ExampleAsyncProducer.cpp
copy to cpp/examples/ExampleProducerWithAsync.cpp
index 2d28d74..0463cdd 100644
--- a/cpp/examples/ExampleAsyncProducer.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -20,8 +20,10 @@
 #include <iostream>
 #include <mutex>
 #include <random>
+#include <string>
 #include <system_error>
 
+#include "gflags/gflags.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
 
@@ -48,12 +50,16 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-int main(int argc, char* argv[]) {
-  const char* topic = "cpp_sdk_standard";
-  const char* name_server = "11.166.42.94:8081";
+DEFINE_string(topic, "lingchu_normal_topic", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
 
-  auto producer =
-      
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+int main(int argc, char* argv[]) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  auto producer = Producer::newBuilder()
+                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .build();
 
   std::atomic_bool stopped;
   std::atomic_long count(0);
@@ -71,10 +77,9 @@ int main(int argc, char* argv[]) {
 
   std::thread stats_thread(stats_lambda);
 
-  std::string body = randomString(1024 * 4);
+  std::string body = randomString(FLAGS_message_body_size);
   std::cout << "Message body size: " << body.length() << std::endl;
 
-  std::size_t total = 256;
   std::size_t completed = 0;
   std::mutex mtx;
   std::condition_variable cv;
@@ -85,19 +90,24 @@ int main(int argc, char* argv[]) {
       completed++;
       count++;
       std::cout << "Message[id=" << receipt.message_id << "] sent" << 
std::endl;
-      if (completed >= total) {
+      if (completed >= FLAGS_total) {
         cv.notify_all();
       }
     };
 
-    for (std::size_t i = 0; i < total; ++i) {
-      auto message = 
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
+      auto message = Message::newBuilder()
+                         .withTopic(FLAGS_topic)
+                         .withTag("TagA")
+                         .withKeys({"Key-" + std::to_string(i)})
+                         .withBody(body)
+                         .build();
       producer.send(std::move(message), send_callback);
     }
 
     {
       std::unique_lock<std::mutex> lk(mtx);
-      cv.wait(lk, [&]() { return completed >= total; });
+      cv.wait(lk, [&]() { return completed >= FLAGS_total; });
     }
   } catch (...) {
     std::cerr << "Ah...No!!!" << std::endl;
diff --git a/cpp/examples/ExampleFifoProducer.cpp 
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
similarity index 73%
copy from cpp/examples/ExampleFifoProducer.cpp
copy to cpp/examples/ExampleProducerWithFifoMessage.cpp
index 947ae5d..e522ad4 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -18,8 +18,10 @@
 #include <atomic>
 #include <iostream>
 #include <random>
+#include <string>
 #include <system_error>
 
+#include "gflags/gflags.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
 
@@ -46,12 +48,23 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
+DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
 int main(int argc, char* argv[]) {
-  const char* topic = "cpp_sdk_standard";
-  const char* name_server = "11.166.42.94:8081";
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  // Adjust log level for file/console sinks
+  auto& logger = getLogger();
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
+  logger.init();
 
-  auto producer =
-      
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+  auto producer = Producer::newBuilder()
+                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .build();
 
   std::atomic_bool stopped;
   std::atomic_long count(0);
@@ -69,17 +82,17 @@ int main(int argc, char* argv[]) {
 
   std::thread stats_thread(stats_lambda);
 
-  std::string body = randomString(1024 * 4);
+  std::string body = randomString(FLAGS_message_body_size);
   std::cout << "Message body size: " << body.length() << std::endl;
 
   try {
-    for (int i = 0; i < 256; ++i) {
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
       auto message = Message::newBuilder()
-                         .withTopic(topic)
+                         .withTopic(FLAGS_topic)
                          .withTag("TagA")
                          .withKeys({"Key-0"})
                          .withBody(body)
-                         .withGroup("message-group-0")
+                         .withGroup("message-group" + std::to_string(i % 10))
                          .build();
       std::error_code ec;
       SendReceipt send_receipt = producer.send(std::move(message), ec);
diff --git a/cpp/examples/ExampleFifoProducer.cpp 
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
similarity index 67%
rename from cpp/examples/ExampleFifoProducer.cpp
rename to cpp/examples/ExampleProducerWithTimedMessage.cpp
index 947ae5d..c44da05 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -16,10 +16,14 @@
  */
 #include <algorithm>
 #include <atomic>
+#include <chrono>
+#include <cstddef>
 #include <iostream>
 #include <random>
+#include <string>
 #include <system_error>
 
+#include "gflags/gflags.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
 
@@ -46,12 +50,22 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
+DEFINE_string(topic, "lingchu_normal_topic", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
 int main(int argc, char* argv[]) {
-  const char* topic = "cpp_sdk_standard";
-  const char* name_server = "11.166.42.94:8081";
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  auto& logger = getLogger();
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
+  logger.init();
 
-  auto producer =
-      
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+  auto producer = Producer::newBuilder()
+                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .build();
 
   std::atomic_bool stopped;
   std::atomic_long count(0);
@@ -69,17 +83,18 @@ int main(int argc, char* argv[]) {
 
   std::thread stats_thread(stats_lambda);
 
-  std::string body = randomString(1024 * 4);
-  std::cout << "Message body size: " << body.length() << std::endl;
+  std::string body = randomString(FLAGS_message_body_size);
 
   try {
-    for (int i = 0; i < 256; ++i) {
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
       auto message = Message::newBuilder()
-                         .withTopic(topic)
+                         .withTopic(FLAGS_topic)
                          .withTag("TagA")
-                         .withKeys({"Key-0"})
+                         .withKeys({"Key-" + std::to_string(i)})
                          .withBody(body)
-                         .withGroup("message-group-0")
+                         .availableAfter(
+                             std::chrono::system_clock::now() +
+                             std::chrono::seconds(10))  // This message would 
be available to consumers after 10 seconds
                          .build();
       std::error_code ec;
       SendReceipt send_receipt = producer.send(std::move(message), ec);
@@ -89,11 +104,12 @@ int main(int argc, char* argv[]) {
   } catch (...) {
     std::cerr << "Ah...No!!!" << std::endl;
   }
-
   stopped.store(true, std::memory_order_relaxed);
   if (stats_thread.joinable()) {
     stats_thread.join();
   }
 
+  // std::this_thread::sleep_for(std::chrono::seconds(1));
+
   return EXIT_SUCCESS;
 }
\ No newline at end of file
diff --git a/cpp/examples/ExampleAsyncProducer.cpp 
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
similarity index 59%
rename from cpp/examples/ExampleAsyncProducer.cpp
rename to cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index 2d28d74..c740533 100644
--- a/cpp/examples/ExampleAsyncProducer.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -16,12 +16,12 @@
  */
 #include <algorithm>
 #include <atomic>
-#include <condition_variable>
 #include <iostream>
-#include <mutex>
 #include <random>
+#include <string>
 #include <system_error>
 
+#include "gflags/gflags.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
 
@@ -48,12 +48,29 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
+DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
 int main(int argc, char* argv[]) {
-  const char* topic = "cpp_sdk_standard";
-  const char* name_server = "11.166.42.94:8081";
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  auto& logger = getLogger();
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
+  logger.init();
 
-  auto producer =
-      
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+  auto checker = [](const Message& message) -> TransactionState {
+    std::cout << "Recovery orphan transactional message[topic=" << 
message.topic() << ", MsgId=" << message.id()
+              << ", txn-id=" << message.extension().transaction_id << 
std::endl;
+    return TransactionState::COMMIT;
+  };
+
+  auto producer = Producer::newBuilder()
+                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .withTransactionChecker(checker)
+                      .build();
 
   std::atomic_bool stopped;
   std::atomic_long count(0);
@@ -71,33 +88,21 @@ int main(int argc, char* argv[]) {
 
   std::thread stats_thread(stats_lambda);
 
-  std::string body = randomString(1024 * 4);
-  std::cout << "Message body size: " << body.length() << std::endl;
-
-  std::size_t total = 256;
-  std::size_t completed = 0;
-  std::mutex mtx;
-  std::condition_variable cv;
+  std::string body = randomString(FLAGS_message_body_size);
 
   try {
-    auto send_callback = [&](const std::error_code& ec, const SendReceipt& 
receipt) {
-      std::unique_lock<std::mutex> lk(mtx);
-      completed++;
-      count++;
-      std::cout << "Message[id=" << receipt.message_id << "] sent" << 
std::endl;
-      if (completed >= total) {
-        cv.notify_all();
-      }
-    };
+    auto message = 
Message::newBuilder().withTopic(FLAGS_topic).withTag("TagA").withBody(body).build();
+    auto transaction = producer.beginTransaction();
+    std::error_code ec;
 
-    for (std::size_t i = 0; i < total; ++i) {
-      auto message = 
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
-      producer.send(std::move(message), send_callback);
-    }
+    producer.send(std::move(message), ec, *transaction);
 
-    {
-      std::unique_lock<std::mutex> lk(mtx);
-      cv.wait(lk, [&]() { return completed >= total; });
+    if (!ec) {
+      if (!transaction->commit()) {
+        std::cerr << "Failed to commit message" << std::endl;
+      }
+    } else {
+      std::cerr << "Failed to send transactional message to topic: " << 
FLAGS_topic << std::endl;
     }
   } catch (...) {
     std::cerr << "Ah...No!!!" << std::endl;
@@ -107,7 +112,5 @@ int main(int argc, char* argv[]) {
     stats_thread.join();
   }
 
-  std::this_thread::sleep_for(std::chrono::seconds(1));
-
   return EXIT_SUCCESS;
 }
\ No newline at end of file
diff --git a/cpp/examples/ExampleTransactionProducer.cpp 
b/cpp/examples/ExampleTransactionProducer.cpp
deleted file mode 100644
index 1041b92..0000000
--- a/cpp/examples/ExampleTransactionProducer.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQProducer.h"
-#include <cstdlib>
-
-using namespace ROCKETMQ_NAMESPACE;
-
-int main(int argc, char* argv[]) {
-  DefaultMQProducer producer("TestGroup");
-
-  const char* topic = "cpp_sdk_standard";
-  const char* name_server = "47.98.116.189:80";
-
-  producer.setNamesrvAddr(name_server);
-  producer.compressBodyThreshold(256);
-  const char* resource_namespace = "MQ_INST_1080056302921134_BXuIbML7";
-  producer.setRegion("cn-hangzhou-pre");
-  producer.setResourceNamespace(resource_namespace);
-  
producer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
-
-  MQMessage message;
-  message.setTopic(topic);
-  message.setTags("TagA");
-  message.setKey("Yuck! Why-plural?");
-  message.setBody("ABC");
-
-  producer.start();
-
-  auto transaction = producer.prepare(message);
-
-  transaction->commit();
-
-  std::this_thread::sleep_for(std::chrono::minutes(30));
-
-  producer.shutdown();
-
-  return EXIT_SUCCESS;
-}
\ No newline at end of file
diff --git a/cpp/examples/PushConsumerWithCustomExecutor.cpp 
b/cpp/examples/PushConsumerWithCustomExecutor.cpp
deleted file mode 100644
index 82dd57d..0000000
--- a/cpp/examples/PushConsumerWithCustomExecutor.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-#include "rocketmq/State.h"
-#include <atomic>
-#include <chrono>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ExecutorImpl {
-public:
-  ExecutorImpl() : state_(State::CREATED) {
-  }
-
-  virtual ~ExecutorImpl() {
-    switch (state_.load(std::memory_order_relaxed)) {
-      case CREATED:
-      case STOPPING:
-      case STOPPED:
-        break;
-
-      case STARTING:
-      case STARTED:
-        state_.store(State::STOPPED);
-        if (worker_.joinable()) {
-          worker_.join();
-        }
-        break;
-    }
-  }
-
-  void submit(const std::function<void(void)>& task) {
-    if (State::STOPPED == state_.load(std::memory_order_relaxed)) {
-      return;
-    }
-
-    {
-      std::unique_lock<std::mutex> lock(task_mtx_);
-      tasks_.push_back(task);
-    }
-    cv_.notify_one();
-  }
-
-  void start() {
-    State expected = State::CREATED;
-    if (state_.compare_exchange_strong(expected, State::STARTING)) {
-      worker_ = std::thread(std::bind(&ExecutorImpl::loop, this));
-      state_.store(State::STARTED);
-    }
-  }
-
-  void stop() {
-    state_.store(State::STOPPED);
-    if (worker_.joinable()) {
-      worker_.join();
-    }
-  }
-
-private:
-  void loop() {
-    while (state_.load(std::memory_order_relaxed) != State::STOPPED) {
-      std::function<void(void)> func;
-      {
-        std::unique_lock<std::mutex> lk(task_mtx_);
-        if (!tasks_.empty()) {
-          func = tasks_.back();
-        }
-      }
-
-      if (func) {
-        func();
-      } else {
-        std::unique_lock<std::mutex> lk(task_mtx_);
-        cv_.wait_for(lk, std::chrono::seconds(3),
-                     [&]() { return state_.load(std::memory_order_relaxed) == 
State::STOPPED || !tasks_.empty(); });
-      }
-    }
-  }
-
-  std::atomic<State> state_;
-  std::vector<std::function<void(void)>> tasks_;
-  std::mutex task_mtx_;
-  std::condition_variable cv_;
-  std::thread worker_;
-};
-
-class SampleMQMessageListener : public StandardMessageListener {
-public:
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    std::lock_guard<std::mutex> lk(console_mtx_);
-    for (const MQMessageExt& msg : msgs) {
-      std::cout << "Topic=" << msg.getTopic() << ", MsgId=" << msg.getMsgId() 
<< ", Body=" << msg.getBody()
-                << std::endl;
-    }
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::mutex console_mtx_;
-};
-
-ROCKETMQ_NAMESPACE_END
-
-int main(int argc, char* argv[]) {
-  using namespace ROCKETMQ_NAMESPACE;
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  DefaultMQPushConsumer push_consumer("TestGroup");
-  MessageListener* listener = new SampleMQMessageListener;
-
-  auto pool = new ExecutorImpl;
-  pool->start();
-  push_consumer.setCustomExecutor(std::bind(&ExecutorImpl::submit, pool, 
std::placeholders::_1));
-  push_consumer.setGroupName("TestGroup");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  push_consumer.subscribe("TestTopic", "*");
-  push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.start();
-
-  std::this_thread::sleep_for(std::chrono::minutes(30));
-  pool->stop();
-  delete pool;
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/PushConsumerWithThrottle.cpp 
b/cpp/examples/PushConsumerWithThrottle.cpp
deleted file mode 100644
index c03b6c5..0000000
--- a/cpp/examples/PushConsumerWithThrottle.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <atomic>
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class CounterMessageListener : public StandardMessageListener {
-public:
-  explicit CounterMessageListener(std::atomic_long& counter) : 
counter_(counter) {
-  }
-
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    counter_.fetch_add(msgs.size());
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::atomic_long& counter_;
-};
-
-ROCKETMQ_NAMESPACE_END
-
-int main(int argc, char* argv[]) {
-
-  using namespace ROCKETMQ_NAMESPACE;
-
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  std::atomic_long counter(0);
-
-  DefaultMQPushConsumer push_consumer("TestGroup");
-  MessageListener* listener = new CounterMessageListener(counter);
-
-  push_consumer.setGroupName("TestGroup");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  push_consumer.subscribe("TestTopic", "*");
-  push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.setThrottle("TestTopic", 20);
-  push_consumer.start();
-
-  std::atomic_bool stopped(false);
-  std::thread report_thread([&counter, &stopped]() {
-    while (!stopped) {
-      std::this_thread::sleep_for(std::chrono::seconds(1));
-      long qps;
-      while (true) {
-        qps = counter.load(std::memory_order_relaxed);
-        if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
-          break;
-        }
-      }
-      std::cout << "QPS: " << qps << std::endl;
-    }
-  });
-
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-  stopped.store(true);
-
-  if (report_thread.joinable()) {
-    report_thread.join();
-  }
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/SqlConsumer.cpp b/cpp/examples/SqlConsumer.cpp
deleted file mode 100644
index c07e4a0..0000000
--- a/cpp/examples/SqlConsumer.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-using namespace rocketmq;
-
-class SampleMQMessageListener : public StandardMessageListener {
-public:
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    std::lock_guard<std::mutex> lk(console_mtx_);
-    for (const MQMessageExt& msg : msgs) {
-      std::cout << "Topic=" << msg.getTopic() << ", MsgId=" << msg.getMsgId() 
<< ", Tag=" << msg.getTags()
-                << ", a=" << msg.getProperty("a") << ", Body=" << 
msg.getBody() << std::endl;
-    }
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::mutex console_mtx_;
-};
-
-int main(int argc, char* argv[]) {
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  DefaultMQPushConsumer push_consumer("TestGroup");
-  MessageListener* listener = new SampleMQMessageListener;
-
-  push_consumer.setGroupName("TestGroup");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  std::string sql_filter("(TAGS is not null and TAGS in ('TagA', 'TagB')) and 
(a is not null and a between 0 and 3)");
-  push_consumer.subscribe("TestTopic", sql_filter, ExpressionType::SQL92);
-  // push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.start();
-
-  std::this_thread::sleep_for(std::chrono::seconds(30));
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/SqlProducer.cpp b/cpp/examples/SqlProducer.cpp
deleted file mode 100644
index bc0fb47..0000000
--- a/cpp/examples/SqlProducer.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQProducer.h"
-#include <iostream>
-
-using namespace rocketmq;
-
-int main(int argc, char* argv[]) {
-
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  DefaultMQProducer producer("PID_sample");
-  producer.setNamesrvAddr("11.167.164.105:9876");
-
-  MQMessage message;
-  message.setTopic("TestTopic");
-  try {
-    producer.start();
-    for (int i = 0; i < 8; ++i) {
-      std::string body = std::to_string(i);
-      message.setBody(body);
-      message.setProperty("a", std::to_string(i % 5));
-      switch (i % 3) {
-        case 0:
-          message.setTags("TagA");
-          break;
-        case 1:
-          message.setTags("TagB");
-          break;
-        case 2:
-          message.setTags("TagC");
-          break;
-      }
-      SendResult sendResult = producer.send(message);
-      std::cout << "Message sent with msgId=" << sendResult.getMsgId()
-                << ", Queue=" << sendResult.getMessageQueue().simpleName() << 
std::endl;
-      std::this_thread::sleep_for(std::chrono::milliseconds(100));
-    }
-  } catch (...) {
-    std::cerr << "Ah...No!!!" << std::endl;
-  }
-  producer.shutdown();
-  return EXIT_SUCCESS;
-}
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Message.h b/cpp/include/rocketmq/Message.h
index 642f322..73336dc 100644
--- a/cpp/include/rocketmq/Message.h
+++ b/cpp/include/rocketmq/Message.h
@@ -48,6 +48,7 @@ struct Extension {
   std::int64_t offset{0};
   std::string nonce;
   std::string transaction_id;
+  bool transactional{false};
 };
 
 class Message {
@@ -163,6 +164,14 @@ public:
 
   MessageBuilder& withProperties(std::unordered_map<std::string, std::string> 
properties);
 
+  /**
+   * @brief Specify timepoint after which the message would be available to 
subscribers.
+   *
+   * @param delivery_timepoint
+   * @return MessageBuilder&
+   */
+  MessageBuilder& availableAfter(std::chrono::system_clock::time_point 
delivery_timepoint);
+
   MessageConstPtr build();
 
 private:
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index cb70065..42004eb 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -28,6 +28,7 @@
 #include "Message.h"
 #include "SendCallback.h"
 #include "SendReceipt.h"
+#include "Transaction.h"
 #include "TransactionChecker.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -65,6 +66,10 @@ public:
    */
   void send(MessageConstPtr message, const SendCallback& callback) noexcept;
 
+  std::unique_ptr<Transaction> beginTransaction();
+
+  void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
+
 private:
   explicit Producer(std::shared_ptr<ProducerImpl> impl) : 
impl_(std::move(impl)) {
   }
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/include/rocketmq/SendReceipt.h
index 06a01cc..489df5e 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -28,6 +28,8 @@ struct SendReceipt {
   std::string message_id;
 
   std::string transaction_id;
+
+  std::string target;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Transaction.h 
b/cpp/include/rocketmq/Transaction.h
index 2728d12..96cee61 100644
--- a/cpp/include/rocketmq/Transaction.h
+++ b/cpp/include/rocketmq/Transaction.h
@@ -33,16 +33,6 @@ public:
   virtual bool commit() = 0;
 
   virtual bool rollback() = 0;
-
-  virtual const std::string& topic() const = 0;
-
-  virtual const std::string& messageId() const = 0;
-
-  virtual const std::string& transactionId() const = 0;
-
-  virtual const std::string& traceContext() const = 0;
-
-  virtual const std::string& endpoint() const = 0;
 };
 
 using TransactionPtr = std::unique_ptr<Transaction>;
diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp
index f20b4cd..e89af54 100644
--- a/cpp/source/base/Message.cpp
+++ b/cpp/source/base/Message.cpp
@@ -67,6 +67,11 @@ MessageBuilder& 
MessageBuilder::withProperties(std::unordered_map<std::string, s
   return *this;
 }
 
+MessageBuilder& 
MessageBuilder::availableAfter(std::chrono::system_clock::time_point 
delivery_timepoint) {
+  message_->delivery_timestamp_ = delivery_timepoint;
+  return *this;
+}
+
 MessageConstPtr MessageBuilder::build() {
   return std::move(message_);
 }
diff --git a/cpp/source/base/ThreadPoolImpl.cpp 
b/cpp/source/base/ThreadPoolImpl.cpp
index 3befa9c..f07ba8d 100644
--- a/cpp/source/base/ThreadPoolImpl.cpp
+++ b/cpp/source/base/ThreadPoolImpl.cpp
@@ -59,7 +59,7 @@ void ThreadPoolImpl::start() {
         }
 #endif
         if (State::STARTED != state_.load(std::memory_order_relaxed)) {
-          SPDLOG_INFO("A thread-pool worker quit");
+          SPDLOG_DEBUG("One thread-pool worker quit");
           break;
         }
       }
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 44accf1..115fbd4 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -281,7 +281,7 @@ void ClientManagerImpl::doHeartbeat() {
 bool ClientManagerImpl::send(const std::string& target_host, const Metadata& 
metadata, SendMessageRequest& request,
                              SendCallback cb) {
   assert(cb);
-  SPDLOG_DEBUG("Prepare to send message to {} asynchronously", target_host);
+  SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", 
target_host, request.DebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
   // Invocation context will be deleted in its onComplete() method.
   auto invocation_context = new InvocationContext<SendMessageResponse>();
@@ -293,8 +293,8 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
 
   const std::string& topic = request.messages().begin()->topic().name();
   std::weak_ptr<ClientManager> client_manager(shared_from_this());
-  auto completion_callback = [topic, cb,
-                              client_manager](const 
InvocationContext<SendMessageResponse>* invocation_context) {
+  auto completion_callback = [topic, cb, client_manager,
+                              target_host](const 
InvocationContext<SendMessageResponse>* invocation_context) {
     ClientManagerPtr client_manager_ptr = client_manager.lock();
     if (!client_manager_ptr) {
       return;
@@ -305,7 +305,8 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
       return;
     }
 
-    SendReceipt send_receipt;
+    SendReceipt send_receipt = {};
+    send_receipt.target = target_host;
     std::error_code ec;
     if (!invocation_context->status.ok()) {
       SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: 
{}, gRPC error message: {}",
@@ -319,7 +320,13 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
     auto&& status = invocation_context->response.status();
     switch (invocation_context->response.status().code()) {
       case rmq::Code::OK: {
-        send_receipt.message_id = 
invocation_context->response.entries().begin()->message_id();
+        if (!invocation_context->response.entries().empty()) {
+          auto first = invocation_context->response.entries().begin();
+          send_receipt.message_id = first->message_id();
+          send_receipt.transaction_id = first->transaction_id();
+        } else {
+          SPDLOG_ERROR("Unexpected send-message-response: {}", 
invocation_context->response.DebugString());
+        }
         break;
       }
 
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 20860e4..995df3a 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -97,7 +97,10 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
 void TelemetryBidiReactor::OnReadDone(bool ok) {
   SPDLOG_DEBUG("OnReadDone: ok={}", ok);
   if (!ok) {
-    SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
+    if (client_.lock()) {
+      SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
+    }
+
     {
       absl::MutexLock lk(&stream_state_mtx_);
       stream_state_ = StreamState::ReadDone;
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index cf2b4da..ce007bc 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -63,6 +63,14 @@ void Producer::send(MessageConstPtr message, const 
SendCallback& callback) noexc
   impl_->send(std::move(message), callback);
 }
 
+std::unique_ptr<Transaction> Producer::beginTransaction() {
+  return impl_->beginTransaction();
+}
+
+void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction) {
+  impl_->send(std::move(message), ec, transaction);
+}
+
 ProducerBuilder Producer::newBuilder() {
   return {};
 }
@@ -83,6 +91,7 @@ ProducerBuilder& ProducerBuilder::withTopics(const 
std::vector<std::string>& top
 }
 
 ProducerBuilder& ProducerBuilder::withTransactionChecker(const 
TransactionChecker& checker) {
+  impl_->transaction_checker_ = checker;
   return *this;
 }
 
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 2cd4399..64bbd18 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -16,6 +16,8 @@
  */
 #include "ProducerImpl.h"
 
+#include <apache/rocketmq/v2/definition.pb.h>
+
 #include <atomic>
 #include <cassert>
 #include <chrono>
@@ -161,6 +163,8 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
     system_properties->set_message_type(rmq::MessageType::DELAY);
   } else if (message.group().has_value()) {
     system_properties->set_message_type(rmq::MessageType::FIFO);
+  } else if (message.extension().transactional) {
+    system_properties->set_message_type(rmq::MessageType::TRANSACTION);
   } else {
     system_properties->set_message_type(rmq::MessageType::NORMAL);
   }
@@ -215,7 +219,7 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   bool          completed = false;
   SendReceipt   send_receipt;
 
-  // Define callback procedureq
+  // Define callback
   auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) {
     ec = code;
     send_receipt = receipt;
@@ -359,17 +363,15 @@ void ProducerImpl::send0(MessageConstPtr message, 
SendCallback callback, std::ve
 
   auto context = std::make_shared<SendContext>(shared_from_this(), 
std::move(message), callback, std::move(list));
   sendImpl(context);
-  // const_cast<Message&>(message).traceContext(
-  //     
opencensus::trace::propagation::ToTraceParentHeader(context->span().context()));
 }
 
-bool ProducerImpl::endTransaction0(const Transaction& transaction, 
TransactionState resolution) {
+bool ProducerImpl::endTransaction0(const MiniTransaction& transaction, 
TransactionState resolution) {
   EndTransactionRequest request;
-  const std::string& topic = transaction.topic();
+  const std::string& topic = transaction.topic;
   request.mutable_topic()->set_name(topic);
   request.mutable_topic()->set_resource_namespace(resourceNamespace());
-  request.set_message_id(transaction.messageId());
-  request.set_transaction_id(transaction.messageId());
+  request.set_message_id(transaction.message_id);
+  request.set_transaction_id(transaction.transaction_id);
 
   std::string action;
   switch (resolution) {
@@ -387,14 +389,14 @@ bool ProducerImpl::endTransaction0(const Transaction& 
transaction, TransactionSt
   bool completed = false;
   bool success = false;
   auto span = opencensus::trace::Span::BlankSpan();
-  if (!transaction.traceContext().empty() && client_config_.sampler_) {
+  if (!transaction.trace_context.empty() && client_config_.sampler_) {
     // Trace transactional message
     opencensus::trace::SpanContext span_context =
-        
opencensus::trace::propagation::FromTraceParentHeader(transaction.traceContext());
+        
opencensus::trace::propagation::FromTraceParentHeader(transaction.trace_context);
     std::string trace_operation_name = TransactionState::COMMIT == resolution
                                            ? 
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
                                            : 
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
-    std::string span_name = resourceNamespace() + "/" + transaction.topic() + 
" " + trace_operation_name;
+    std::string span_name = resourceNamespace() + "/" + transaction.topic + " 
" + trace_operation_name;
     if (span_context.IsValid()) {
       span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, 
span_context, {client_config_.sampler_.get()});
     } else {
@@ -407,7 +409,7 @@ bool ProducerImpl::endTransaction0(const Transaction& 
transaction, TransactionSt
 
   auto mtx = std::make_shared<absl::Mutex>();
   auto cv = std::make_shared<absl::CondVar>();
-  const auto& endpoint = transaction.endpoint();
+  const auto& endpoint = transaction.target;
   std::weak_ptr<ProducerImpl> publisher(shared_from_this());
 
   auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, 
const EndTransactionResponse& response) {
@@ -434,8 +436,8 @@ bool ProducerImpl::endTransaction0(const Transaction& 
transaction, TransactionSt
     }
   };
 
-  client_manager_->endTransaction(transaction.endpoint(), metadata, request,
-                                  
absl::ToChronoMilliseconds(requestTimeout()), cb);
+  client_manager_->endTransaction(transaction.target, metadata, request, 
absl::ToChronoMilliseconds(requestTimeout()),
+                                  cb);
   {
     absl::MutexLock lk(mtx.get());
     cv->Wait(mtx.get());
@@ -458,28 +460,33 @@ void ProducerImpl::isolateEndpoint(const std::string& 
target) {
   isolated_endpoints_.insert(target);
 }
 
-std::unique_ptr<TransactionImpl> ProducerImpl::prepare(MessageConstPtr 
message, std::error_code& ec) {
-  std::weak_ptr<ProducerImpl> producer(shared_from_this());
-  auto transaction = absl::make_unique<TransactionImpl>(message->topic(), 
message->id(),
-                                                        
message->traceContext().value_or(""), producer);
-  SendReceipt send_receipt = send(std::move(message), ec);
-  if (ec) {
-    return nullptr;
+void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, 
Transaction& transaction) {
+  MiniTransaction mini = {};
+  mini.topic = message->topic();
+  mini.trace_context = message->traceContext().value_or("");
+
+  if (message->group().has_value()) {
+    ec = ErrorCode::MessagePropertyConflictWithType;
+    SPDLOG_WARN("FIFO message may not be transactional");
+    return;
   }
 
-  transaction->transactionId(send_receipt.transaction_id);
+  if (message->deliveryTimestamp().has_value()) {
+    ec = ErrorCode::MessagePropertyConflictWithType;
+    SPDLOG_WARN("Timed message may not be transactional");
+    return;
+  }
 
-  // TODO: endpoint id
-  // transaction->endpoint(xxx);
-  return transaction;
-}
+  Message* msg = const_cast<Message*>(message.get());
+  msg->mutableExtension().transactional = true;
 
-bool ProducerImpl::commit(const Transaction& transaction) {
-  return endTransaction0(transaction, TransactionState::COMMIT);
-}
+  SendReceipt send_receipt = send(std::move(message), ec);
 
-bool ProducerImpl::rollback(const Transaction& transaction) {
-  return endTransaction0(transaction, TransactionState::ROLLBACK);
+  mini.message_id = send_receipt.message_id;
+  mini.transaction_id = send_receipt.transaction_id;
+  mini.target = send_receipt.target;
+  auto& impl = dynamic_cast<TransactionImpl&>(transaction);
+  impl.appendMiniTransaction(mini);
 }
 
 void ProducerImpl::getPublishInfoAsync(const std::string& topic, const 
PublishInfoCallback& cb) {
@@ -553,12 +560,14 @@ void 
ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
   if (transaction_checker_) {
     std::weak_ptr<ProducerImpl> producer(shared_from_this());
 
-    auto transaction = absl::make_unique<TransactionImpl>(message->topic(), 
message->id(),
-                                                          
message->traceContext().value_or(""), producer);
-    transaction->endpoint(message->extension().target_endpoint);
-    transaction->transactionId(message->extension().transaction_id);
+    MiniTransaction transaction = {};
+    transaction.topic = message->topic();
+    transaction.message_id = message->id();
+    transaction.transaction_id = message->extension().transaction_id;
+    transaction.trace_context = message->traceContext().value_or("");
+    transaction.target = message->extension().target_endpoint;
     TransactionState state = transaction_checker_(*message);
-    endTransaction0(*transaction, state);
+    endTransaction0(transaction, state);
   } else {
     SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr");
   }
diff --git a/cpp/source/rocketmq/TransactionImpl.cpp 
b/cpp/source/rocketmq/TransactionImpl.cpp
index 2d2bf66..0f1dcd9 100644
--- a/cpp/source/rocketmq/TransactionImpl.cpp
+++ b/cpp/source/rocketmq/TransactionImpl.cpp
@@ -26,7 +26,14 @@ bool TransactionImpl::commit() {
     return false;
   }
 
-  return producer->commit(*this);
+  bool result = true;
+  {
+    absl::MutexLock lk(&pending_transactions_mtx_);
+    for (const auto& mini : pending_transactions_) {
+      result &= producer->endTransaction0(mini, TransactionState::COMMIT);
+    }
+  }
+  return result;
 }
 
 bool TransactionImpl::rollback() {
@@ -34,15 +41,15 @@ bool TransactionImpl::rollback() {
   if (!producer) {
     return false;
   }
-  return producer->rollback(*this);
-}
 
-const std::string& TransactionImpl::messageId() const {
-  return message_id_;
-}
-
-const std::string& TransactionImpl::transactionId() const {
-  return transaction_id_;
+  bool result = true;
+  {
+    absl::MutexLock lk(&pending_transactions_mtx_);
+    for (const auto& mini : pending_transactions_) {
+      result &= producer->endTransaction0(mini, TransactionState::ROLLBACK);
+    }
+  }
+  return result;
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index d3d865c..ad9b24d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -59,11 +59,12 @@ public:
 
   void setTransactionChecker(TransactionChecker checker);
 
-  std::unique_ptr<TransactionImpl> prepare(MessageConstPtr message, 
std::error_code& ec);
-
-  bool commit(const Transaction& transaction);
+  std::unique_ptr<TransactionImpl> beginTransaction() {
+    auto producer = std::weak_ptr<ProducerImpl>(shared_from_this());
+    return absl::make_unique<TransactionImpl>(producer);
+  }
 
-  bool rollback(const Transaction& transaction);
+  void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
   /**
    * Check if the RPC client for the target host is isolated or not
@@ -108,7 +109,11 @@ public:
 
   void topicsOfInterest(std::vector<std::string> topics) override 
LOCKS_EXCLUDED(topics_mtx_);
 
-  const PublishStats& stats() const { return stats_; }
+  const PublishStats& stats() const {
+    return stats_;
+  }
+
+  bool endTransaction0(const MiniTransaction& transaction, TransactionState 
resolution);
 
 protected:
   std::shared_ptr<ClientImpl> self() override {
@@ -155,8 +160,6 @@ private:
 
   void send0(MessageConstPtr message, SendCallback callback, 
std::vector<rmq::MessageQueue> list);
 
-  bool endTransaction0(const Transaction& transaction, TransactionState 
resolution);
-
   void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) 
LOCKS_EXCLUDED(isolated_endpoints_mtx_);
 
   friend class ProducerBuilder;
diff --git a/cpp/source/rocketmq/include/TransactionImpl.h 
b/cpp/source/rocketmq/include/TransactionImpl.h
index ce633ed..22e26d5 100644
--- a/cpp/source/rocketmq/include/TransactionImpl.h
+++ b/cpp/source/rocketmq/include/TransactionImpl.h
@@ -19,6 +19,8 @@
 #include <memory>
 #include <string>
 
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Transaction.h"
 
@@ -26,10 +28,17 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class ProducerImpl;
 
+struct MiniTransaction {
+  std::string topic;
+  std::string message_id;
+  std::string transaction_id;
+  std::string trace_context;
+  std::string target;
+};
+
 class TransactionImpl : public Transaction {
 public:
-  TransactionImpl(std::string topic, std::string message_id, std::string 
trace_context, const std::weak_ptr<ProducerImpl>& producer)
-      : topic_(std::move(topic)), message_id_(std::move(message_id)), 
trace_context_(std::move(trace_context)), producer_(producer) {
+  TransactionImpl(const std::weak_ptr<ProducerImpl>& producer) : 
producer_(producer) {
   }
 
   ~TransactionImpl() override = default;
@@ -38,39 +47,15 @@ public:
 
   bool rollback() override;
 
-  const std::string& messageId() const override;
-
-  const std::string& transactionId() const override;
-
-  void transactionId(std::string transaction_id) {
-    transaction_id_ = std::move(transaction_id);
-  }
-
-  const std::string& traceContext() const override {
-    return trace_context_;
-  }
-
-  void traceContext(std::string trace_context) {
-    trace_context_ = std::move(trace_context);
-  }
-
-  const std::string& endpoint() const override {
-    return endpoint_;
-  }
-
-  void endpoint(std::string endpoint) { endpoint_ = std::move(endpoint); }
-
-  const std::string& topic() const override {
-    return topic_;
+  void appendMiniTransaction(MiniTransaction mini_transaction) 
LOCKS_EXCLUDED(pending_transactions_mtx_) {
+    absl::MutexLock lk(&pending_transactions_mtx_);
+    pending_transactions_.emplace_back(std::move(mini_transaction));
   }
 
 private:
-  std::string topic_;
-  std::string message_id_;
-  std::string transaction_id_;
-  std::string endpoint_;
-  std::string trace_context_;
   std::weak_ptr<ProducerImpl> producer_;
+  std::vector<MiniTransaction> pending_transactions_ 
GUARDED_BY(pending_transactions_mtx_);
+  absl::Mutex pending_transactions_mtx_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/scheduler/SchedulerImpl.cpp 
b/cpp/source/scheduler/SchedulerImpl.cpp
index 7eb4da3..1c036df 100644
--- a/cpp/source/scheduler/SchedulerImpl.cpp
+++ b/cpp/source/scheduler/SchedulerImpl.cpp
@@ -77,7 +77,7 @@ void SchedulerImpl::start() {
 #endif
 
           if (State::STARTED != state_.load(std::memory_order_relaxed)) {
-            SPDLOG_INFO("One scheduler worker thread quit");
+            SPDLOG_DEBUG("One scheduler worker thread quit");
             break;
           }
         }

Reply via email to