This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp_sync_consumer_examples in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f2a85b759f8e9ff2462188975332fac8073f2fab Author: Li Zhanhui <[email protected]> AuthorDate: Thu Jul 21 09:57:07 2022 +0800 Sync consumer examples --- cpp/README.md | 25 +++++++++++++++++++++++-- cpp/examples/BUILD.bazel | 2 ++ cpp/examples/ExampleProducer.cpp | 6 ++++++ cpp/examples/ExamplePushConsumer.cpp | 27 +++++++++++++++++++-------- cpp/examples/ExampleSimpleConsumer.cpp | 22 ++++++++++++++++------ 5 files changed, 66 insertions(+), 16 deletions(-) diff --git a/cpp/README.md b/cpp/README.md index fa92392..8d79caf 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -58,7 +58,10 @@ if "com_google_googletest" not in native.existing_rules(): bazel test //... ``` -3. Run Examples + + +### Run Examples + From the workspace, Publish standard messages to your topic synchronously @@ -87,12 +90,30 @@ if "com_google_googletest" not in native.existing_rules(): ----------- - Publish Transactional messages + Publish transactional messages ``` bazel run //examples:example_producer_with_transactional_message -- --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16 ``` where `1024` are in bytes + + ------------ + + Consume messages through Message Listener + + ``` + bazel run //examples:example_push_consumer -- --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --group=YOUR_GROUP_ID + ``` + + ------------ + + Consume messages through raw, atomic API + + ``` + bazel run //examples:example_simple_consumer -- --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --group=YOUR_GROUP_ID + ``` + + ### IDE diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel index 939f0a2..5113e15 100644 --- a/cpp/examples/BUILD.bazel +++ b/cpp/examples/BUILD.bazel @@ -78,6 +78,7 @@ cc_binary( ], deps = [ "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", ], ) @@ -88,5 +89,6 @@ cc_binary( ], deps = [ "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", ], ) \ No newline at end of file diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index b82cfce..6284011 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -22,6 +22,7 @@ #include <system_error> #include "gflags/gflags.h" +#include "rocketmq/Logger.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" @@ -56,6 +57,11 @@ DEFINE_uint32(total, 256, "Number of sample messages to publish"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); + auto& logger = getLogger(); + logger.setConsoleLevel(Level::Debug); + logger.setLevel(Level::Debug); + logger.init(); + auto producer = Producer::newBuilder() .withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build()) .build(); diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp index 00954ff..793436c 100644 --- a/cpp/examples/ExamplePushConsumer.cpp +++ b/cpp/examples/ExamplePushConsumer.cpp @@ -19,29 +19,40 @@ #include <mutex> #include <thread> +#include "gflags/gflags.h" #include "rocketmq/Logger.h" #include "rocketmq/PushConsumer.h" -#include "spdlog/spdlog.h" using namespace ROCKETMQ_NAMESPACE; +DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); +DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); + int main(int argc, char* argv[]) { - const char* topic = "cpp_sdk_standard"; - const char* name_server = "11.166.42.94:8081"; - const char* group = "GID_cpp_sdk_standard"; + gflags::ParseCommandLineFlags(&argc, &argv, true); + + auto& logger = getLogger(); + logger.setConsoleLevel(Level::Debug); + logger.setLevel(Level::Debug); + logger.init(); + std::string tag = "*"; - auto listener = [](const Message& message) { return ConsumeResult::SUCCESS; }; + auto listener = [](const Message& message) { + std::cout << "Received a message[topic=" << message.topic() << ", MsgId=" << message.id() << "]" << std::endl; + return ConsumeResult::SUCCESS; + }; auto push_consumer = PushConsumer::newBuilder() - .withGroup(group) + .withGroup(FLAGS_group) .withConfiguration(Configuration::newBuilder() - .withEndpoints(name_server) + .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .build()) .withConsumeThreads(4) .withListener(listener) - .subscribe(topic, tag) + .subscribe(FLAGS_topic, tag) .build(); std::this_thread::sleep_for(std::chrono::minutes(30)); diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index dc76718..4cbea38 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -18,20 +18,30 @@ #include <iostream> #include <thread> +#include "gflags/gflags.h" +#include "rocketmq/Logger.h" #include "rocketmq/SimpleConsumer.h" using namespace ROCKETMQ_NAMESPACE; +DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); +DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); + int main(int argc, char* argv[]) { - const char* group = "ExampleSimpleGroup"; - const char* topic = "cpp_sdk_standard"; - const char* name_server = "11.166.42.94:8081"; + gflags::ParseCommandLineFlags(&argc, &argv, true); + + auto& logger = getLogger(); + logger.setConsoleLevel(Level::Debug); + logger.setLevel(Level::Debug); + logger.init(); + std::string tag = "*"; auto simple_consumer = SimpleConsumer::newBuilder() - .withGroup(group) - .withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()) - .subscribe(topic, tag) + .withGroup(FLAGS_group) + .withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build()) + .subscribe(FLAGS_topic, tag) .build(); std::vector<MessageConstSharedPtr> messages; std::error_code ec;
