This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 739f754 Prepare to sync examples (#66)
739f754 is described below
commit 739f754cd04a79f97dbbdb1bc2b8e09064c33878
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Jul 21 09:26:40 2022 +0800
Prepare to sync examples (#66)
* Prepare to sync examples
* Make topic, service access point, message body size configurable through
command option
* Add an example producer, publishing timed message
* Add transactional example
* Update README
---
cpp/README.md | 41 ++++-
cpp/bazel/rocketmq_deps.bzl | 189 +++++++++++----------
cpp/examples/BUILD.bazel | 105 ++++--------
cpp/examples/BenchmarkPushConsumer.cpp | 83 ---------
cpp/examples/ExampleFifoPushConsumer.cpp | 67 --------
cpp/examples/ExampleProducer.cpp | 38 +++--
...ncProducer.cpp => ExampleProducerWithAsync.cpp} | 32 ++--
...ucer.cpp => ExampleProducerWithFifoMessage.cpp} | 29 +++-
...cer.cpp => ExampleProducerWithTimedMessage.cpp} | 38 +++--
...=> ExampleProducerWithTransactionalMessage.cpp} | 65 +++----
cpp/examples/ExampleTransactionProducer.cpp | 52 ------
cpp/examples/PushConsumerWithCustomExecutor.cpp | 147 ----------------
cpp/examples/PushConsumerWithThrottle.cpp | 88 ----------
cpp/examples/SqlConsumer.cpp | 61 -------
cpp/examples/SqlProducer.cpp | 60 -------
cpp/include/rocketmq/Message.h | 9 +
cpp/include/rocketmq/Producer.h | 5 +
cpp/include/rocketmq/SendReceipt.h | 2 +
cpp/include/rocketmq/Transaction.h | 10 --
cpp/source/base/Message.cpp | 5 +
cpp/source/base/ThreadPoolImpl.cpp | 2 +-
cpp/source/client/ClientManagerImpl.cpp | 17 +-
cpp/source/client/TelemetryBidiReactor.cpp | 5 +-
cpp/source/rocketmq/Producer.cpp | 9 +
cpp/source/rocketmq/ProducerImpl.cpp | 79 +++++----
cpp/source/rocketmq/TransactionImpl.cpp | 25 ++-
cpp/source/rocketmq/include/ProducerImpl.h | 17 +-
cpp/source/rocketmq/include/TransactionImpl.h | 47 ++---
cpp/source/scheduler/SchedulerImpl.cpp | 2 +-
29 files changed, 432 insertions(+), 897 deletions(-)
diff --git a/cpp/README.md b/cpp/README.md
index 705abe2..fa92392 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -48,16 +48,53 @@ if "com_google_googletest" not in native.existing_rules():
1. Build
- From the repository root,
+ From the workspace,
```
bazel build //...
```
2. Run Unit Tests
- From the repository root,
+ From the workspace,
```
bazel test //...
```
+3. Run Examples
+ From the workspace,
+
+ Publish standard messages to your topic synchronously
+ ```
+ bazel run //examples:example_producer -- --topic=YOUR_TOPIC
--access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16
+ ```
+ where `1024` are in bytes
+
+ ------------
+
+
+ Publish standard messages to your topic asynchronously
+ ```
+ bazel run //examples:example_producer_with_async -- --topic=YOUR_TOPIC
--access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16
+ ```
+ where `1024` are in bytes
+
+ ------------
+
+
+ Publish FIFO messages to your topic
+ ```
+ bazel run //examples:example_producer_with_fifo_message --
--topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024
--total=16
+ ```
+ where `1024` are in bytes
+
+ -----------
+
+ Publish Transactional messages
+ ```
+ bazel run //examples:example_producer_with_transactional_message --
--topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024
--total=16
+ ```
+ where `1024` are in bytes
+
+
+
### IDE
[Visual Studio Code](https://code.visualstudio.com/) +
[Clangd](https://clangd.llvm.org/) is the recommended development toolset.
1. VSCode + Clangd
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index 939b97c..9306f94 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -8,86 +8,75 @@ def rocketmq_deps():
name = "opentelementry_api",
actual = "@com_github_opentelemetry//api:api",
)
+
+ maybe(
+ http_archive,
+ name = "com_google_googletest",
+ sha256 =
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
+ strip_prefix = "googletest-release-1.11.0",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz",
+
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
+ ],
+ )
+
+ maybe(
+ http_archive,
+ name = "com_github_gulrak_filesystem",
+ strip_prefix = "filesystem-1.5.0",
+ sha256 =
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz",
+ "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz",
+ ],
+ build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
+ )
+
+ maybe(
+ http_archive,
+ name = "com_github_gabime_spdlog",
+ strip_prefix = "spdlog-1.9.2",
+ sha256 =
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz",
+ "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz",
+ ],
+ build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
+ )
+
+ maybe(
+ http_archive,
+ name = "com_github_fmtlib_fmt",
+ strip_prefix = "fmt-8.0.1",
+ sha256 =
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz",
+ "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz",
+ ],
+ build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
+ )
+
+ maybe(
+ http_archive,
+ name = "com_google_protobuf",
+ sha256 =
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
+ strip_prefix = "protobuf-3.20.1",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz",
+
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz",
+ ],
+ )
- if "rules_python" not in native.existing_rules():
- http_archive(
- name = "rules_python",
- sha256 =
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
- strip_prefix = "rules_python-0.8.1",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz",
-
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz",
- ],
- )
-
- if "com_google_googletest" not in native.existing_rules():
- http_archive(
- name = "com_google_googletest",
- sha256 =
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
- strip_prefix = "googletest-release-1.11.0",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz",
-
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
- ],
- )
-
- if "com_github_gulrak_filesystem" not in native.existing_rules():
- http_archive(
- name = "com_github_gulrak_filesystem",
- strip_prefix = "filesystem-1.5.0",
- sha256 =
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz",
- "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz",
- ],
- build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
- )
-
- if "com_github_gabime_spdlog" not in native.existing_rules():
- http_archive(
- name = "com_github_gabime_spdlog",
- strip_prefix = "spdlog-1.9.2",
- sha256 =
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz",
-
"https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz",
- ],
- build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
- )
-
- if "com_github_fmtlib_fmt" not in native.existing_rules():
- http_archive(
- name = "com_github_fmtlib_fmt",
- strip_prefix = "fmt-8.0.1",
- sha256 =
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz",
- "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz",
- ],
- build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
- )
-
- if "com_google_protobuf" not in native.existing_rules():
- http_archive(
- name = "com_google_protobuf",
- sha256 =
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
- strip_prefix = "protobuf-3.20.1",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz",
-
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz",
- ],
- )
-
- if "rules_proto_grpc" not in native.existing_rules():
- http_archive(
- name = "rules_proto_grpc",
- sha256 =
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
- strip_prefix = "rules_proto_grpc-4.1.1",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz",
-
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz"
- ],
- )
+ maybe(
+ http_archive,
+ name = "rules_proto_grpc",
+ sha256 =
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
+ strip_prefix = "rules_proto_grpc-4.1.1",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz",
+
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz"
+ ],
+ )
maybe(
http_archive,
@@ -100,16 +89,27 @@ def rocketmq_deps():
strip_prefix = "opencensus-cpp-0.4.1",
)
- if "com_google_absl" not in native.existing_rules():
- http_archive(
- name = "com_google_absl",
- sha256 =
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
- strip_prefix = "abseil-cpp-20211102.0",
- urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz",
-
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz",
- ],
- )
+ maybe(
+ http_archive,
+ name = "com_google_absl",
+ sha256 =
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
+ strip_prefix = "abseil-cpp-20211102.0",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz",
+
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz",
+ ],
+ )
+
+ maybe(
+ http_archive,
+ name = "com_github_gflags_gflags",
+ strip_prefix = "gflags-2.2.2",
+ sha256 =
"34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz",
+ "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz",
+ ]
+ )
maybe(
http_archive,
@@ -145,6 +145,17 @@ def rocketmq_deps():
strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
)
+ maybe(
+ http_archive,
+ name = "rules_python",
+ sha256 =
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
+ strip_prefix = "rules_python-0.8.1",
+ urls = [
+
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz",
+
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz",
+ ],
+ )
+
maybe(
http_archive,
name = "rules_swift",
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index c2dbab4..939f0a2 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -23,26 +23,51 @@ cc_binary(
],
deps = [
"//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
],
)
cc_binary(
- name = "example_fifo_producer",
+ name = "example_producer_with_fifo_message",
srcs = [
- "ExampleFifoProducer.cpp",
+ "ExampleProducerWithFifoMessage.cpp",
],
deps = [
"//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
],
)
cc_binary(
- name = "example_async_producer",
+ name = "example_producer_with_async",
srcs = [
- "ExampleAsyncProducer.cpp",
+ "ExampleProducerWithAsync.cpp",
],
deps = [
"//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
+ ],
+)
+
+cc_binary(
+ name = "example_producer_with_timed_message",
+ srcs = [
+ "ExampleProducerWithTimedMessage.cpp",
+ ],
+ deps = [
+ "//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
+ ],
+)
+
+cc_binary(
+ name = "example_producer_with_transactional_message",
+ srcs = [
+ "ExampleProducerWithTransactionalMessage.cpp",
+ ],
+ deps = [
+ "//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
],
)
@@ -56,16 +81,6 @@ cc_binary(
],
)
-# cc_binary(
-# name = "sql_producer",
-# srcs = [
-# "SqlProducer.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
-
cc_binary(
name = "example_push_consumer",
srcs = [
@@ -74,64 +89,4 @@ cc_binary(
deps = [
"//source/rocketmq:rocketmq_library",
],
-)
-
-# cc_binary(
-# name = "example_fifo_push_consumer",
-# srcs = [
-# "ExampleFifoPushConsumer.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
-
-# cc_binary(
-# name = "push_consumer_with_custom_executor",
-# srcs = [
-# "PushConsumerWithCustomExecutor.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
-
-# cc_binary(
-# name = "push_consumer_with_throttle",
-# srcs = [
-# "PushConsumerWithThrottle.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
-
-# cc_binary(
-# name = "sql_consumer",
-# srcs = [
-# "SqlConsumer.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
-
-# cc_binary(
-# name = "benchmark_push_consumer",
-# srcs = [
-# "BenchmarkPushConsumer.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
-
-# cc_binary(
-# name = "example_transaction_producer",
-# srcs = [
-# "ExampleTransactionProducer.cpp",
-# ],
-# deps = [
-# "//source/rocketmq:rocketmq_library",
-# ],
-# )
\ No newline at end of file
+)
\ No newline at end of file
diff --git a/cpp/examples/BenchmarkPushConsumer.cpp
b/cpp/examples/BenchmarkPushConsumer.cpp
deleted file mode 100644
index 400fa80..0000000
--- a/cpp/examples/BenchmarkPushConsumer.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <atomic>
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-using namespace rocketmq;
-
-class CounterMessageListener : public StandardMessageListener {
-public:
- explicit CounterMessageListener(std::atomic_long& counter) :
counter_(counter) {
- }
-
- ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs)
override {
- counter_.fetch_add(msgs.size());
- return ConsumeMessageResult::SUCCESS;
- }
-
-private:
- std::atomic_long& counter_;
-};
-
-int main(int argc, char* argv[]) {
-
- Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
- logger.init();
-
- std::atomic_long counter(0);
-
- DefaultMQPushConsumer push_consumer("CID_sample");
- MessageListener* listener = new CounterMessageListener(counter);
-
- push_consumer.setGroupName("CID_sample");
- push_consumer.setInstanceName("CID_sample_member_0");
- push_consumer.subscribe("TopicTest", "*");
- push_consumer.setNamesrvAddr("11.167.164.105:9876");
- push_consumer.registerMessageListener(listener);
- push_consumer.start();
-
- std::atomic_bool stopped(false);
- std::thread report_thread([&counter, &stopped]() {
- while (!stopped) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- long qps;
- while (true) {
- qps = counter.load(std::memory_order_relaxed);
- if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
- break;
- }
- }
- std::cout << "QPS: " << qps << std::endl;
- }
- });
-
- std::this_thread::sleep_for(std::chrono::minutes(30));
- stopped.store(true);
-
- if (report_thread.joinable()) {
- report_thread.join();
- }
-
- push_consumer.shutdown();
- return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/ExampleFifoPushConsumer.cpp
b/cpp/examples/ExampleFifoPushConsumer.cpp
deleted file mode 100644
index df039d7..0000000
--- a/cpp/examples/ExampleFifoPushConsumer.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-#include "rocketmq/Logger.h"
-
-#include "rocketmq/MessageListener.h"
-#include "spdlog/spdlog.h"
-
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-using namespace rocketmq;
-
-class SampleMQMessageListener : public FifoMessageListener {
-public:
- ConsumeMessageResult consumeMessage(const MQMessageExt& message) override {
- SPDLOG_INFO("Consume message[Topic={}, MessageId={}] OK",
message.getTopic(), message.getMsgId());
- std::cout << "Consume Message[MsgId=" << message.getMsgId() << "] OK. Body
Size: " << message.getBody().size()
- << std::endl;
- // std::this_thread::sleep_for(std::chrono::seconds(1));
- return ConsumeMessageResult::SUCCESS;
- }
-};
-
-int main(int argc, char* argv[]) {
-
- Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
- logger.init();
-
- const char* group_id = "GID_lingchu_test_order";
- const char* topic = "lingchu_test_order_topic";
- const char* resource_namespace = "MQ_INST_1080056302921134_BXyTLppt";
-
- DefaultMQPushConsumer push_consumer(group_id);
- push_consumer.setResourceNamespace(resource_namespace);
-
push_consumer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
- push_consumer.setNamesrvAddr("120.25.100.131:8081");
- FifoMessageListener* listener = new SampleMQMessageListener();
- push_consumer.setInstanceName("instance_0");
- push_consumer.subscribe(topic, "*");
- push_consumer.registerMessageListener(listener);
- push_consumer.setConsumeThreadCount(4);
- push_consumer.start();
-
- std::this_thread::sleep_for(std::chrono::minutes(30));
-
- push_consumer.shutdown();
- return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 9bdb9b4..b82cfce 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -18,8 +18,10 @@
#include <atomic>
#include <iostream>
#include <random>
+#include <string>
#include <system_error>
+#include "gflags/gflags.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
@@ -46,12 +48,17 @@ std::string randomString(std::string::size_type len) {
return result;
}
+DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
int main(int argc, char* argv[]) {
- const char* topic = "lingchu_normal_topic";
- const char* name_server = "121.196.167.124:8081";
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
- auto producer =
-
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+ auto producer = Producer::newBuilder()
+
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .build();
std::atomic_bool stopped;
std::atomic_long count(0);
@@ -69,16 +76,25 @@ int main(int argc, char* argv[]) {
std::thread stats_thread(stats_lambda);
- std::string body = randomString(1024 * 4);
- std::cout << "Message body size: " << body.length() << std::endl;
+ std::string body = randomString(FLAGS_message_body_size);
try {
- for (int i = 0; i < 256; ++i) {
- auto message =
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
+ for (std::size_t i = 0; i < FLAGS_total; ++i) {
+ auto message = Message::newBuilder()
+ .withTopic(FLAGS_topic)
+ .withTag("TagA")
+ .withKeys({"Key-" + std::to_string(i)})
+ .withBody(body)
+ .build();
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
- std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
- count++;
+ if (ec) {
+ std::cerr << "Failed to publish message to " << FLAGS_topic << ".
Cause: " << ec.message() << std::endl;
+ } else {
+ std::cout << "Publish message to " << FLAGS_topic << " OK. Message-ID:
" << send_receipt.message_id
+ << std::endl;
+ count++;
+ }
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
@@ -88,7 +104,5 @@ int main(int argc, char* argv[]) {
stats_thread.join();
}
- // std::this_thread::sleep_for(std::chrono::seconds(1));
-
return EXIT_SUCCESS;
}
\ No newline at end of file
diff --git a/cpp/examples/ExampleAsyncProducer.cpp
b/cpp/examples/ExampleProducerWithAsync.cpp
similarity index 73%
copy from cpp/examples/ExampleAsyncProducer.cpp
copy to cpp/examples/ExampleProducerWithAsync.cpp
index 2d28d74..0463cdd 100644
--- a/cpp/examples/ExampleAsyncProducer.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -20,8 +20,10 @@
#include <iostream>
#include <mutex>
#include <random>
+#include <string>
#include <system_error>
+#include "gflags/gflags.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
@@ -48,12 +50,16 @@ std::string randomString(std::string::size_type len) {
return result;
}
-int main(int argc, char* argv[]) {
- const char* topic = "cpp_sdk_standard";
- const char* name_server = "11.166.42.94:8081";
+DEFINE_string(topic, "lingchu_normal_topic", "Topic to which messages are
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
- auto producer =
-
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+ auto producer = Producer::newBuilder()
+
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .build();
std::atomic_bool stopped;
std::atomic_long count(0);
@@ -71,10 +77,9 @@ int main(int argc, char* argv[]) {
std::thread stats_thread(stats_lambda);
- std::string body = randomString(1024 * 4);
+ std::string body = randomString(FLAGS_message_body_size);
std::cout << "Message body size: " << body.length() << std::endl;
- std::size_t total = 256;
std::size_t completed = 0;
std::mutex mtx;
std::condition_variable cv;
@@ -85,19 +90,24 @@ int main(int argc, char* argv[]) {
completed++;
count++;
std::cout << "Message[id=" << receipt.message_id << "] sent" <<
std::endl;
- if (completed >= total) {
+ if (completed >= FLAGS_total) {
cv.notify_all();
}
};
- for (std::size_t i = 0; i < total; ++i) {
- auto message =
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
+ for (std::size_t i = 0; i < FLAGS_total; ++i) {
+ auto message = Message::newBuilder()
+ .withTopic(FLAGS_topic)
+ .withTag("TagA")
+ .withKeys({"Key-" + std::to_string(i)})
+ .withBody(body)
+ .build();
producer.send(std::move(message), send_callback);
}
{
std::unique_lock<std::mutex> lk(mtx);
- cv.wait(lk, [&]() { return completed >= total; });
+ cv.wait(lk, [&]() { return completed >= FLAGS_total; });
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
diff --git a/cpp/examples/ExampleFifoProducer.cpp
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
similarity index 73%
copy from cpp/examples/ExampleFifoProducer.cpp
copy to cpp/examples/ExampleProducerWithFifoMessage.cpp
index 947ae5d..e522ad4 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -18,8 +18,10 @@
#include <atomic>
#include <iostream>
#include <random>
+#include <string>
#include <system_error>
+#include "gflags/gflags.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
@@ -46,12 +48,23 @@ std::string randomString(std::string::size_type len) {
return result;
}
+DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
int main(int argc, char* argv[]) {
- const char* topic = "cpp_sdk_standard";
- const char* name_server = "11.166.42.94:8081";
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ // Adjust log level for file/console sinks
+ auto& logger = getLogger();
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
+ logger.init();
- auto producer =
-
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+ auto producer = Producer::newBuilder()
+
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .build();
std::atomic_bool stopped;
std::atomic_long count(0);
@@ -69,17 +82,17 @@ int main(int argc, char* argv[]) {
std::thread stats_thread(stats_lambda);
- std::string body = randomString(1024 * 4);
+ std::string body = randomString(FLAGS_message_body_size);
std::cout << "Message body size: " << body.length() << std::endl;
try {
- for (int i = 0; i < 256; ++i) {
+ for (std::size_t i = 0; i < FLAGS_total; ++i) {
auto message = Message::newBuilder()
- .withTopic(topic)
+ .withTopic(FLAGS_topic)
.withTag("TagA")
.withKeys({"Key-0"})
.withBody(body)
- .withGroup("message-group-0")
+ .withGroup("message-group" + std::to_string(i % 10))
.build();
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
diff --git a/cpp/examples/ExampleFifoProducer.cpp
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
similarity index 67%
rename from cpp/examples/ExampleFifoProducer.cpp
rename to cpp/examples/ExampleProducerWithTimedMessage.cpp
index 947ae5d..c44da05 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -16,10 +16,14 @@
*/
#include <algorithm>
#include <atomic>
+#include <chrono>
+#include <cstddef>
#include <iostream>
#include <random>
+#include <string>
#include <system_error>
+#include "gflags/gflags.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
@@ -46,12 +50,22 @@ std::string randomString(std::string::size_type len) {
return result;
}
+DEFINE_string(topic, "lingchu_normal_topic", "Topic to which messages are
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
int main(int argc, char* argv[]) {
- const char* topic = "cpp_sdk_standard";
- const char* name_server = "11.166.42.94:8081";
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ auto& logger = getLogger();
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
+ logger.init();
- auto producer =
-
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+ auto producer = Producer::newBuilder()
+
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .build();
std::atomic_bool stopped;
std::atomic_long count(0);
@@ -69,17 +83,18 @@ int main(int argc, char* argv[]) {
std::thread stats_thread(stats_lambda);
- std::string body = randomString(1024 * 4);
- std::cout << "Message body size: " << body.length() << std::endl;
+ std::string body = randomString(FLAGS_message_body_size);
try {
- for (int i = 0; i < 256; ++i) {
+ for (std::size_t i = 0; i < FLAGS_total; ++i) {
auto message = Message::newBuilder()
- .withTopic(topic)
+ .withTopic(FLAGS_topic)
.withTag("TagA")
- .withKeys({"Key-0"})
+ .withKeys({"Key-" + std::to_string(i)})
.withBody(body)
- .withGroup("message-group-0")
+ .availableAfter(
+ std::chrono::system_clock::now() +
+ std::chrono::seconds(10)) // This message would
be available to consumers after 10 seconds
.build();
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
@@ -89,11 +104,12 @@ int main(int argc, char* argv[]) {
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
}
-
stopped.store(true, std::memory_order_relaxed);
if (stats_thread.joinable()) {
stats_thread.join();
}
+ // std::this_thread::sleep_for(std::chrono::seconds(1));
+
return EXIT_SUCCESS;
}
\ No newline at end of file
diff --git a/cpp/examples/ExampleAsyncProducer.cpp
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
similarity index 59%
rename from cpp/examples/ExampleAsyncProducer.cpp
rename to cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index 2d28d74..c740533 100644
--- a/cpp/examples/ExampleAsyncProducer.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -16,12 +16,12 @@
*/
#include <algorithm>
#include <atomic>
-#include <condition_variable>
#include <iostream>
-#include <mutex>
#include <random>
+#include <string>
#include <system_error>
+#include "gflags/gflags.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
@@ -48,12 +48,29 @@ std::string randomString(std::string::size_type len) {
return result;
}
+DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+
int main(int argc, char* argv[]) {
- const char* topic = "cpp_sdk_standard";
- const char* name_server = "11.166.42.94:8081";
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ auto& logger = getLogger();
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
+ logger.init();
- auto producer =
-
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+ auto checker = [](const Message& message) -> TransactionState {
+ std::cout << "Recovery orphan transactional message[topic=" <<
message.topic() << ", MsgId=" << message.id()
+ << ", txn-id=" << message.extension().transaction_id <<
std::endl;
+ return TransactionState::COMMIT;
+ };
+
+ auto producer = Producer::newBuilder()
+
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withTransactionChecker(checker)
+ .build();
std::atomic_bool stopped;
std::atomic_long count(0);
@@ -71,33 +88,21 @@ int main(int argc, char* argv[]) {
std::thread stats_thread(stats_lambda);
- std::string body = randomString(1024 * 4);
- std::cout << "Message body size: " << body.length() << std::endl;
-
- std::size_t total = 256;
- std::size_t completed = 0;
- std::mutex mtx;
- std::condition_variable cv;
+ std::string body = randomString(FLAGS_message_body_size);
try {
- auto send_callback = [&](const std::error_code& ec, const SendReceipt&
receipt) {
- std::unique_lock<std::mutex> lk(mtx);
- completed++;
- count++;
- std::cout << "Message[id=" << receipt.message_id << "] sent" <<
std::endl;
- if (completed >= total) {
- cv.notify_all();
- }
- };
+ auto message =
Message::newBuilder().withTopic(FLAGS_topic).withTag("TagA").withBody(body).build();
+ auto transaction = producer.beginTransaction();
+ std::error_code ec;
- for (std::size_t i = 0; i < total; ++i) {
- auto message =
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
- producer.send(std::move(message), send_callback);
- }
+ producer.send(std::move(message), ec, *transaction);
- {
- std::unique_lock<std::mutex> lk(mtx);
- cv.wait(lk, [&]() { return completed >= total; });
+ if (!ec) {
+ if (!transaction->commit()) {
+ std::cerr << "Failed to commit message" << std::endl;
+ }
+ } else {
+ std::cerr << "Failed to send transactional message to topic: " <<
FLAGS_topic << std::endl;
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
@@ -107,7 +112,5 @@ int main(int argc, char* argv[]) {
stats_thread.join();
}
- std::this_thread::sleep_for(std::chrono::seconds(1));
-
return EXIT_SUCCESS;
}
\ No newline at end of file
diff --git a/cpp/examples/ExampleTransactionProducer.cpp
b/cpp/examples/ExampleTransactionProducer.cpp
deleted file mode 100644
index 1041b92..0000000
--- a/cpp/examples/ExampleTransactionProducer.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQProducer.h"
-#include <cstdlib>
-
-using namespace ROCKETMQ_NAMESPACE;
-
-int main(int argc, char* argv[]) {
- DefaultMQProducer producer("TestGroup");
-
- const char* topic = "cpp_sdk_standard";
- const char* name_server = "47.98.116.189:80";
-
- producer.setNamesrvAddr(name_server);
- producer.compressBodyThreshold(256);
- const char* resource_namespace = "MQ_INST_1080056302921134_BXuIbML7";
- producer.setRegion("cn-hangzhou-pre");
- producer.setResourceNamespace(resource_namespace);
-
producer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
-
- MQMessage message;
- message.setTopic(topic);
- message.setTags("TagA");
- message.setKey("Yuck! Why-plural?");
- message.setBody("ABC");
-
- producer.start();
-
- auto transaction = producer.prepare(message);
-
- transaction->commit();
-
- std::this_thread::sleep_for(std::chrono::minutes(30));
-
- producer.shutdown();
-
- return EXIT_SUCCESS;
-}
\ No newline at end of file
diff --git a/cpp/examples/PushConsumerWithCustomExecutor.cpp
b/cpp/examples/PushConsumerWithCustomExecutor.cpp
deleted file mode 100644
index 82dd57d..0000000
--- a/cpp/examples/PushConsumerWithCustomExecutor.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-#include "rocketmq/State.h"
-#include <atomic>
-#include <chrono>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ExecutorImpl {
-public:
- ExecutorImpl() : state_(State::CREATED) {
- }
-
- virtual ~ExecutorImpl() {
- switch (state_.load(std::memory_order_relaxed)) {
- case CREATED:
- case STOPPING:
- case STOPPED:
- break;
-
- case STARTING:
- case STARTED:
- state_.store(State::STOPPED);
- if (worker_.joinable()) {
- worker_.join();
- }
- break;
- }
- }
-
- void submit(const std::function<void(void)>& task) {
- if (State::STOPPED == state_.load(std::memory_order_relaxed)) {
- return;
- }
-
- {
- std::unique_lock<std::mutex> lock(task_mtx_);
- tasks_.push_back(task);
- }
- cv_.notify_one();
- }
-
- void start() {
- State expected = State::CREATED;
- if (state_.compare_exchange_strong(expected, State::STARTING)) {
- worker_ = std::thread(std::bind(&ExecutorImpl::loop, this));
- state_.store(State::STARTED);
- }
- }
-
- void stop() {
- state_.store(State::STOPPED);
- if (worker_.joinable()) {
- worker_.join();
- }
- }
-
-private:
- void loop() {
- while (state_.load(std::memory_order_relaxed) != State::STOPPED) {
- std::function<void(void)> func;
- {
- std::unique_lock<std::mutex> lk(task_mtx_);
- if (!tasks_.empty()) {
- func = tasks_.back();
- }
- }
-
- if (func) {
- func();
- } else {
- std::unique_lock<std::mutex> lk(task_mtx_);
- cv_.wait_for(lk, std::chrono::seconds(3),
- [&]() { return state_.load(std::memory_order_relaxed) ==
State::STOPPED || !tasks_.empty(); });
- }
- }
- }
-
- std::atomic<State> state_;
- std::vector<std::function<void(void)>> tasks_;
- std::mutex task_mtx_;
- std::condition_variable cv_;
- std::thread worker_;
-};
-
-class SampleMQMessageListener : public StandardMessageListener {
-public:
- ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs)
override {
- std::lock_guard<std::mutex> lk(console_mtx_);
- for (const MQMessageExt& msg : msgs) {
- std::cout << "Topic=" << msg.getTopic() << ", MsgId=" << msg.getMsgId()
<< ", Body=" << msg.getBody()
- << std::endl;
- }
- return ConsumeMessageResult::SUCCESS;
- }
-
-private:
- std::mutex console_mtx_;
-};
-
-ROCKETMQ_NAMESPACE_END
-
-int main(int argc, char* argv[]) {
- using namespace ROCKETMQ_NAMESPACE;
- Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
- logger.init();
-
- DefaultMQPushConsumer push_consumer("TestGroup");
- MessageListener* listener = new SampleMQMessageListener;
-
- auto pool = new ExecutorImpl;
- pool->start();
- push_consumer.setCustomExecutor(std::bind(&ExecutorImpl::submit, pool,
std::placeholders::_1));
- push_consumer.setGroupName("TestGroup");
- push_consumer.setInstanceName("CID_sample_member_0");
- push_consumer.subscribe("TestTopic", "*");
- push_consumer.setNamesrvAddr("11.167.164.105:9876");
- push_consumer.registerMessageListener(listener);
- push_consumer.start();
-
- std::this_thread::sleep_for(std::chrono::minutes(30));
- pool->stop();
- delete pool;
-
- push_consumer.shutdown();
- return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/PushConsumerWithThrottle.cpp
b/cpp/examples/PushConsumerWithThrottle.cpp
deleted file mode 100644
index c03b6c5..0000000
--- a/cpp/examples/PushConsumerWithThrottle.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <atomic>
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class CounterMessageListener : public StandardMessageListener {
-public:
- explicit CounterMessageListener(std::atomic_long& counter) :
counter_(counter) {
- }
-
- ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs)
override {
- counter_.fetch_add(msgs.size());
- return ConsumeMessageResult::SUCCESS;
- }
-
-private:
- std::atomic_long& counter_;
-};
-
-ROCKETMQ_NAMESPACE_END
-
-int main(int argc, char* argv[]) {
-
- using namespace ROCKETMQ_NAMESPACE;
-
- Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
- logger.init();
-
- std::atomic_long counter(0);
-
- DefaultMQPushConsumer push_consumer("TestGroup");
- MessageListener* listener = new CounterMessageListener(counter);
-
- push_consumer.setGroupName("TestGroup");
- push_consumer.setInstanceName("CID_sample_member_0");
- push_consumer.subscribe("TestTopic", "*");
- push_consumer.setNamesrvAddr("11.167.164.105:9876");
- push_consumer.registerMessageListener(listener);
- push_consumer.setThrottle("TestTopic", 20);
- push_consumer.start();
-
- std::atomic_bool stopped(false);
- std::thread report_thread([&counter, &stopped]() {
- while (!stopped) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- long qps;
- while (true) {
- qps = counter.load(std::memory_order_relaxed);
- if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
- break;
- }
- }
- std::cout << "QPS: " << qps << std::endl;
- }
- });
-
- std::this_thread::sleep_for(std::chrono::seconds(10));
- stopped.store(true);
-
- if (report_thread.joinable()) {
- report_thread.join();
- }
-
- push_consumer.shutdown();
- return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/SqlConsumer.cpp b/cpp/examples/SqlConsumer.cpp
deleted file mode 100644
index c07e4a0..0000000
--- a/cpp/examples/SqlConsumer.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-using namespace rocketmq;
-
-class SampleMQMessageListener : public StandardMessageListener {
-public:
- ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs)
override {
- std::lock_guard<std::mutex> lk(console_mtx_);
- for (const MQMessageExt& msg : msgs) {
- std::cout << "Topic=" << msg.getTopic() << ", MsgId=" << msg.getMsgId()
<< ", Tag=" << msg.getTags()
- << ", a=" << msg.getProperty("a") << ", Body=" <<
msg.getBody() << std::endl;
- }
- return ConsumeMessageResult::SUCCESS;
- }
-
-private:
- std::mutex console_mtx_;
-};
-
-int main(int argc, char* argv[]) {
- Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
- logger.init();
-
- DefaultMQPushConsumer push_consumer("TestGroup");
- MessageListener* listener = new SampleMQMessageListener;
-
- push_consumer.setGroupName("TestGroup");
- push_consumer.setInstanceName("CID_sample_member_0");
- std::string sql_filter("(TAGS is not null and TAGS in ('TagA', 'TagB')) and
(a is not null and a between 0 and 3)");
- push_consumer.subscribe("TestTopic", sql_filter, ExpressionType::SQL92);
- // push_consumer.setNamesrvAddr("11.167.164.105:9876");
- push_consumer.registerMessageListener(listener);
- push_consumer.start();
-
- std::this_thread::sleep_for(std::chrono::seconds(30));
-
- push_consumer.shutdown();
- return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/SqlProducer.cpp b/cpp/examples/SqlProducer.cpp
deleted file mode 100644
index bc0fb47..0000000
--- a/cpp/examples/SqlProducer.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQProducer.h"
-#include <iostream>
-
-using namespace rocketmq;
-
-int main(int argc, char* argv[]) {
-
- Logger& logger = getLogger();
- logger.setLevel(Level::Debug);
- logger.init();
-
- DefaultMQProducer producer("PID_sample");
- producer.setNamesrvAddr("11.167.164.105:9876");
-
- MQMessage message;
- message.setTopic("TestTopic");
- try {
- producer.start();
- for (int i = 0; i < 8; ++i) {
- std::string body = std::to_string(i);
- message.setBody(body);
- message.setProperty("a", std::to_string(i % 5));
- switch (i % 3) {
- case 0:
- message.setTags("TagA");
- break;
- case 1:
- message.setTags("TagB");
- break;
- case 2:
- message.setTags("TagC");
- break;
- }
- SendResult sendResult = producer.send(message);
- std::cout << "Message sent with msgId=" << sendResult.getMsgId()
- << ", Queue=" << sendResult.getMessageQueue().simpleName() <<
std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- } catch (...) {
- std::cerr << "Ah...No!!!" << std::endl;
- }
- producer.shutdown();
- return EXIT_SUCCESS;
-}
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Message.h b/cpp/include/rocketmq/Message.h
index 642f322..73336dc 100644
--- a/cpp/include/rocketmq/Message.h
+++ b/cpp/include/rocketmq/Message.h
@@ -48,6 +48,7 @@ struct Extension {
std::int64_t offset{0};
std::string nonce;
std::string transaction_id;
+ bool transactional{false};
};
class Message {
@@ -163,6 +164,14 @@ public:
MessageBuilder& withProperties(std::unordered_map<std::string, std::string>
properties);
+ /**
+ * @brief Specify timepoint after which the message would be available to
subscribers.
+ *
+ * @param delivery_timepoint
+ * @return MessageBuilder&
+ */
+ MessageBuilder& availableAfter(std::chrono::system_clock::time_point
delivery_timepoint);
+
MessageConstPtr build();
private:
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index cb70065..42004eb 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -28,6 +28,7 @@
#include "Message.h"
#include "SendCallback.h"
#include "SendReceipt.h"
+#include "Transaction.h"
#include "TransactionChecker.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -65,6 +66,10 @@ public:
*/
void send(MessageConstPtr message, const SendCallback& callback) noexcept;
+ std::unique_ptr<Transaction> beginTransaction();
+
+ void send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
+
private:
explicit Producer(std::shared_ptr<ProducerImpl> impl) :
impl_(std::move(impl)) {
}
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/include/rocketmq/SendReceipt.h
index 06a01cc..489df5e 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -28,6 +28,8 @@ struct SendReceipt {
std::string message_id;
std::string transaction_id;
+
+ std::string target;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Transaction.h
b/cpp/include/rocketmq/Transaction.h
index 2728d12..96cee61 100644
--- a/cpp/include/rocketmq/Transaction.h
+++ b/cpp/include/rocketmq/Transaction.h
@@ -33,16 +33,6 @@ public:
virtual bool commit() = 0;
virtual bool rollback() = 0;
-
- virtual const std::string& topic() const = 0;
-
- virtual const std::string& messageId() const = 0;
-
- virtual const std::string& transactionId() const = 0;
-
- virtual const std::string& traceContext() const = 0;
-
- virtual const std::string& endpoint() const = 0;
};
using TransactionPtr = std::unique_ptr<Transaction>;
diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp
index f20b4cd..e89af54 100644
--- a/cpp/source/base/Message.cpp
+++ b/cpp/source/base/Message.cpp
@@ -67,6 +67,11 @@ MessageBuilder&
MessageBuilder::withProperties(std::unordered_map<std::string, s
return *this;
}
+MessageBuilder&
MessageBuilder::availableAfter(std::chrono::system_clock::time_point
delivery_timepoint) {
+ message_->delivery_timestamp_ = delivery_timepoint;
+ return *this;
+}
+
MessageConstPtr MessageBuilder::build() {
return std::move(message_);
}
diff --git a/cpp/source/base/ThreadPoolImpl.cpp
b/cpp/source/base/ThreadPoolImpl.cpp
index 3befa9c..f07ba8d 100644
--- a/cpp/source/base/ThreadPoolImpl.cpp
+++ b/cpp/source/base/ThreadPoolImpl.cpp
@@ -59,7 +59,7 @@ void ThreadPoolImpl::start() {
}
#endif
if (State::STARTED != state_.load(std::memory_order_relaxed)) {
- SPDLOG_INFO("A thread-pool worker quit");
+ SPDLOG_DEBUG("One thread-pool worker quit");
break;
}
}
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 44accf1..115fbd4 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -281,7 +281,7 @@ void ClientManagerImpl::doHeartbeat() {
bool ClientManagerImpl::send(const std::string& target_host, const Metadata&
metadata, SendMessageRequest& request,
SendCallback cb) {
assert(cb);
- SPDLOG_DEBUG("Prepare to send message to {} asynchronously", target_host);
+ SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}",
target_host, request.DebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
// Invocation context will be deleted in its onComplete() method.
auto invocation_context = new InvocationContext<SendMessageResponse>();
@@ -293,8 +293,8 @@ bool ClientManagerImpl::send(const std::string&
target_host, const Metadata& met
const std::string& topic = request.messages().begin()->topic().name();
std::weak_ptr<ClientManager> client_manager(shared_from_this());
- auto completion_callback = [topic, cb,
- client_manager](const
InvocationContext<SendMessageResponse>* invocation_context) {
+ auto completion_callback = [topic, cb, client_manager,
+ target_host](const
InvocationContext<SendMessageResponse>* invocation_context) {
ClientManagerPtr client_manager_ptr = client_manager.lock();
if (!client_manager_ptr) {
return;
@@ -305,7 +305,8 @@ bool ClientManagerImpl::send(const std::string&
target_host, const Metadata& met
return;
}
- SendReceipt send_receipt;
+ SendReceipt send_receipt = {};
+ send_receipt.target = target_host;
std::error_code ec;
if (!invocation_context->status.ok()) {
SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code:
{}, gRPC error message: {}",
@@ -319,7 +320,13 @@ bool ClientManagerImpl::send(const std::string&
target_host, const Metadata& met
auto&& status = invocation_context->response.status();
switch (invocation_context->response.status().code()) {
case rmq::Code::OK: {
- send_receipt.message_id =
invocation_context->response.entries().begin()->message_id();
+ if (!invocation_context->response.entries().empty()) {
+ auto first = invocation_context->response.entries().begin();
+ send_receipt.message_id = first->message_id();
+ send_receipt.transaction_id = first->transaction_id();
+ } else {
+ SPDLOG_ERROR("Unexpected send-message-response: {}",
invocation_context->response.DebugString());
+ }
break;
}
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index 20860e4..995df3a 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -97,7 +97,10 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("OnReadDone: ok={}", ok);
if (!ok) {
- SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
+ if (client_.lock()) {
+ SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
+ }
+
{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::ReadDone;
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index cf2b4da..ce007bc 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -63,6 +63,14 @@ void Producer::send(MessageConstPtr message, const
SendCallback& callback) noexc
impl_->send(std::move(message), callback);
}
+std::unique_ptr<Transaction> Producer::beginTransaction() {
+ return impl_->beginTransaction();
+}
+
+void Producer::send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction) {
+ impl_->send(std::move(message), ec, transaction);
+}
+
ProducerBuilder Producer::newBuilder() {
return {};
}
@@ -83,6 +91,7 @@ ProducerBuilder& ProducerBuilder::withTopics(const
std::vector<std::string>& top
}
ProducerBuilder& ProducerBuilder::withTransactionChecker(const
TransactionChecker& checker) {
+ impl_->transaction_checker_ = checker;
return *this;
}
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 2cd4399..64bbd18 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -16,6 +16,8 @@
*/
#include "ProducerImpl.h"
+#include <apache/rocketmq/v2/definition.pb.h>
+
#include <atomic>
#include <cassert>
#include <chrono>
@@ -161,6 +163,8 @@ void ProducerImpl::wrapSendMessageRequest(const Message&
message, SendMessageReq
system_properties->set_message_type(rmq::MessageType::DELAY);
} else if (message.group().has_value()) {
system_properties->set_message_type(rmq::MessageType::FIFO);
+ } else if (message.extension().transactional) {
+ system_properties->set_message_type(rmq::MessageType::TRANSACTION);
} else {
system_properties->set_message_type(rmq::MessageType::NORMAL);
}
@@ -215,7 +219,7 @@ SendReceipt ProducerImpl::send(MessageConstPtr message,
std::error_code& ec) noe
bool completed = false;
SendReceipt send_receipt;
- // Define callback procedureq
+ // Define callback
auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) {
ec = code;
send_receipt = receipt;
@@ -359,17 +363,15 @@ void ProducerImpl::send0(MessageConstPtr message,
SendCallback callback, std::ve
auto context = std::make_shared<SendContext>(shared_from_this(),
std::move(message), callback, std::move(list));
sendImpl(context);
- // const_cast<Message&>(message).traceContext(
- //
opencensus::trace::propagation::ToTraceParentHeader(context->span().context()));
}
-bool ProducerImpl::endTransaction0(const Transaction& transaction,
TransactionState resolution) {
+bool ProducerImpl::endTransaction0(const MiniTransaction& transaction,
TransactionState resolution) {
EndTransactionRequest request;
- const std::string& topic = transaction.topic();
+ const std::string& topic = transaction.topic;
request.mutable_topic()->set_name(topic);
request.mutable_topic()->set_resource_namespace(resourceNamespace());
- request.set_message_id(transaction.messageId());
- request.set_transaction_id(transaction.messageId());
+ request.set_message_id(transaction.message_id);
+ request.set_transaction_id(transaction.transaction_id);
std::string action;
switch (resolution) {
@@ -387,14 +389,14 @@ bool ProducerImpl::endTransaction0(const Transaction&
transaction, TransactionSt
bool completed = false;
bool success = false;
auto span = opencensus::trace::Span::BlankSpan();
- if (!transaction.traceContext().empty() && client_config_.sampler_) {
+ if (!transaction.trace_context.empty() && client_config_.sampler_) {
// Trace transactional message
opencensus::trace::SpanContext span_context =
-
opencensus::trace::propagation::FromTraceParentHeader(transaction.traceContext());
+
opencensus::trace::propagation::FromTraceParentHeader(transaction.trace_context);
std::string trace_operation_name = TransactionState::COMMIT == resolution
?
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
:
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
- std::string span_name = resourceNamespace() + "/" + transaction.topic() +
" " + trace_operation_name;
+ std::string span_name = resourceNamespace() + "/" + transaction.topic + "
" + trace_operation_name;
if (span_context.IsValid()) {
span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name,
span_context, {client_config_.sampler_.get()});
} else {
@@ -407,7 +409,7 @@ bool ProducerImpl::endTransaction0(const Transaction&
transaction, TransactionSt
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
- const auto& endpoint = transaction.endpoint();
+ const auto& endpoint = transaction.target;
std::weak_ptr<ProducerImpl> publisher(shared_from_this());
auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec,
const EndTransactionResponse& response) {
@@ -434,8 +436,8 @@ bool ProducerImpl::endTransaction0(const Transaction&
transaction, TransactionSt
}
};
- client_manager_->endTransaction(transaction.endpoint(), metadata, request,
-
absl::ToChronoMilliseconds(requestTimeout()), cb);
+ client_manager_->endTransaction(transaction.target, metadata, request,
absl::ToChronoMilliseconds(requestTimeout()),
+ cb);
{
absl::MutexLock lk(mtx.get());
cv->Wait(mtx.get());
@@ -458,28 +460,33 @@ void ProducerImpl::isolateEndpoint(const std::string&
target) {
isolated_endpoints_.insert(target);
}
-std::unique_ptr<TransactionImpl> ProducerImpl::prepare(MessageConstPtr
message, std::error_code& ec) {
- std::weak_ptr<ProducerImpl> producer(shared_from_this());
- auto transaction = absl::make_unique<TransactionImpl>(message->topic(),
message->id(),
-
message->traceContext().value_or(""), producer);
- SendReceipt send_receipt = send(std::move(message), ec);
- if (ec) {
- return nullptr;
+void ProducerImpl::send(MessageConstPtr message, std::error_code& ec,
Transaction& transaction) {
+ MiniTransaction mini = {};
+ mini.topic = message->topic();
+ mini.trace_context = message->traceContext().value_or("");
+
+ if (message->group().has_value()) {
+ ec = ErrorCode::MessagePropertyConflictWithType;
+ SPDLOG_WARN("FIFO message may not be transactional");
+ return;
}
- transaction->transactionId(send_receipt.transaction_id);
+ if (message->deliveryTimestamp().has_value()) {
+ ec = ErrorCode::MessagePropertyConflictWithType;
+ SPDLOG_WARN("Timed message may not be transactional");
+ return;
+ }
- // TODO: endpoint id
- // transaction->endpoint(xxx);
- return transaction;
-}
+ Message* msg = const_cast<Message*>(message.get());
+ msg->mutableExtension().transactional = true;
-bool ProducerImpl::commit(const Transaction& transaction) {
- return endTransaction0(transaction, TransactionState::COMMIT);
-}
+ SendReceipt send_receipt = send(std::move(message), ec);
-bool ProducerImpl::rollback(const Transaction& transaction) {
- return endTransaction0(transaction, TransactionState::ROLLBACK);
+ mini.message_id = send_receipt.message_id;
+ mini.transaction_id = send_receipt.transaction_id;
+ mini.target = send_receipt.target;
+ auto& impl = dynamic_cast<TransactionImpl&>(transaction);
+ impl.appendMiniTransaction(mini);
}
void ProducerImpl::getPublishInfoAsync(const std::string& topic, const
PublishInfoCallback& cb) {
@@ -553,12 +560,14 @@ void
ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
if (transaction_checker_) {
std::weak_ptr<ProducerImpl> producer(shared_from_this());
- auto transaction = absl::make_unique<TransactionImpl>(message->topic(),
message->id(),
-
message->traceContext().value_or(""), producer);
- transaction->endpoint(message->extension().target_endpoint);
- transaction->transactionId(message->extension().transaction_id);
+ MiniTransaction transaction = {};
+ transaction.topic = message->topic();
+ transaction.message_id = message->id();
+ transaction.transaction_id = message->extension().transaction_id;
+ transaction.trace_context = message->traceContext().value_or("");
+ transaction.target = message->extension().target_endpoint;
TransactionState state = transaction_checker_(*message);
- endTransaction0(*transaction, state);
+ endTransaction0(transaction, state);
} else {
SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr");
}
diff --git a/cpp/source/rocketmq/TransactionImpl.cpp
b/cpp/source/rocketmq/TransactionImpl.cpp
index 2d2bf66..0f1dcd9 100644
--- a/cpp/source/rocketmq/TransactionImpl.cpp
+++ b/cpp/source/rocketmq/TransactionImpl.cpp
@@ -26,7 +26,14 @@ bool TransactionImpl::commit() {
return false;
}
- return producer->commit(*this);
+ bool result = true;
+ {
+ absl::MutexLock lk(&pending_transactions_mtx_);
+ for (const auto& mini : pending_transactions_) {
+ result &= producer->endTransaction0(mini, TransactionState::COMMIT);
+ }
+ }
+ return result;
}
bool TransactionImpl::rollback() {
@@ -34,15 +41,15 @@ bool TransactionImpl::rollback() {
if (!producer) {
return false;
}
- return producer->rollback(*this);
-}
-const std::string& TransactionImpl::messageId() const {
- return message_id_;
-}
-
-const std::string& TransactionImpl::transactionId() const {
- return transaction_id_;
+ bool result = true;
+ {
+ absl::MutexLock lk(&pending_transactions_mtx_);
+ for (const auto& mini : pending_transactions_) {
+ result &= producer->endTransaction0(mini, TransactionState::ROLLBACK);
+ }
+ }
+ return result;
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index d3d865c..ad9b24d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -59,11 +59,12 @@ public:
void setTransactionChecker(TransactionChecker checker);
- std::unique_ptr<TransactionImpl> prepare(MessageConstPtr message,
std::error_code& ec);
-
- bool commit(const Transaction& transaction);
+ std::unique_ptr<TransactionImpl> beginTransaction() {
+ auto producer = std::weak_ptr<ProducerImpl>(shared_from_this());
+ return absl::make_unique<TransactionImpl>(producer);
+ }
- bool rollback(const Transaction& transaction);
+ void send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
/**
* Check if the RPC client for the target host is isolated or not
@@ -108,7 +109,11 @@ public:
void topicsOfInterest(std::vector<std::string> topics) override
LOCKS_EXCLUDED(topics_mtx_);
- const PublishStats& stats() const { return stats_; }
+ const PublishStats& stats() const {
+ return stats_;
+ }
+
+ bool endTransaction0(const MiniTransaction& transaction, TransactionState
resolution);
protected:
std::shared_ptr<ClientImpl> self() override {
@@ -155,8 +160,6 @@ private:
void send0(MessageConstPtr message, SendCallback callback,
std::vector<rmq::MessageQueue> list);
- bool endTransaction0(const Transaction& transaction, TransactionState
resolution);
-
void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints)
LOCKS_EXCLUDED(isolated_endpoints_mtx_);
friend class ProducerBuilder;
diff --git a/cpp/source/rocketmq/include/TransactionImpl.h
b/cpp/source/rocketmq/include/TransactionImpl.h
index ce633ed..22e26d5 100644
--- a/cpp/source/rocketmq/include/TransactionImpl.h
+++ b/cpp/source/rocketmq/include/TransactionImpl.h
@@ -19,6 +19,8 @@
#include <memory>
#include <string>
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
#include "rocketmq/Message.h"
#include "rocketmq/Transaction.h"
@@ -26,10 +28,17 @@ ROCKETMQ_NAMESPACE_BEGIN
class ProducerImpl;
+struct MiniTransaction {
+ std::string topic;
+ std::string message_id;
+ std::string transaction_id;
+ std::string trace_context;
+ std::string target;
+};
+
class TransactionImpl : public Transaction {
public:
- TransactionImpl(std::string topic, std::string message_id, std::string
trace_context, const std::weak_ptr<ProducerImpl>& producer)
- : topic_(std::move(topic)), message_id_(std::move(message_id)),
trace_context_(std::move(trace_context)), producer_(producer) {
+ TransactionImpl(const std::weak_ptr<ProducerImpl>& producer) :
producer_(producer) {
}
~TransactionImpl() override = default;
@@ -38,39 +47,15 @@ public:
bool rollback() override;
- const std::string& messageId() const override;
-
- const std::string& transactionId() const override;
-
- void transactionId(std::string transaction_id) {
- transaction_id_ = std::move(transaction_id);
- }
-
- const std::string& traceContext() const override {
- return trace_context_;
- }
-
- void traceContext(std::string trace_context) {
- trace_context_ = std::move(trace_context);
- }
-
- const std::string& endpoint() const override {
- return endpoint_;
- }
-
- void endpoint(std::string endpoint) { endpoint_ = std::move(endpoint); }
-
- const std::string& topic() const override {
- return topic_;
+ void appendMiniTransaction(MiniTransaction mini_transaction)
LOCKS_EXCLUDED(pending_transactions_mtx_) {
+ absl::MutexLock lk(&pending_transactions_mtx_);
+ pending_transactions_.emplace_back(std::move(mini_transaction));
}
private:
- std::string topic_;
- std::string message_id_;
- std::string transaction_id_;
- std::string endpoint_;
- std::string trace_context_;
std::weak_ptr<ProducerImpl> producer_;
+ std::vector<MiniTransaction> pending_transactions_
GUARDED_BY(pending_transactions_mtx_);
+ absl::Mutex pending_transactions_mtx_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/scheduler/SchedulerImpl.cpp
b/cpp/source/scheduler/SchedulerImpl.cpp
index 7eb4da3..1c036df 100644
--- a/cpp/source/scheduler/SchedulerImpl.cpp
+++ b/cpp/source/scheduler/SchedulerImpl.cpp
@@ -77,7 +77,7 @@ void SchedulerImpl::start() {
#endif
if (State::STARTED != state_.load(std::memory_order_relaxed)) {
- SPDLOG_INFO("One scheduler worker thread quit");
+ SPDLOG_DEBUG("One scheduler worker thread quit");
break;
}
}