This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit b1d383aeaa5f4efd75a61d4c9abbb3e5eea47bb0
Author: Li Zhanhui <[email protected]>
AuthorDate: Wed Jul 20 15:07:41 2022 +0800

    Prepare to sync examples
---
 cpp/bazel/rocketmq_deps.bzl                     | 189 +++++++++++++-----------
 cpp/examples/BUILD.bazel                        |  31 +---
 cpp/examples/BenchmarkPushConsumer.cpp          |  83 -----------
 cpp/examples/ExampleProducer.cpp                |  21 ++-
 cpp/examples/PushConsumerWithCustomExecutor.cpp | 147 ------------------
 cpp/examples/PushConsumerWithThrottle.cpp       |  88 -----------
 6 files changed, 114 insertions(+), 445 deletions(-)

diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index 939b97c..9306f94 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -8,86 +8,75 @@ def rocketmq_deps():
         name = "opentelementry_api",
         actual = "@com_github_opentelemetry//api:api",
     )
+    
+    maybe(
+        http_archive,
+        name = "com_google_googletest",
+        sha256 = 
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
+        strip_prefix = "googletest-release-1.11.0",
+        urls = [
+        
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz";,
+        
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz";,
+        ],
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_gulrak_filesystem",
+        strip_prefix = "filesystem-1.5.0",
+        sha256 = 
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz";,
+            "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz";,
+        ],
+        build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_gabime_spdlog",
+        strip_prefix = "spdlog-1.9.2",
+        sha256 = 
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz";,
+            "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz";,
+        ],
+        build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_fmtlib_fmt",
+        strip_prefix = "fmt-8.0.1",
+        sha256 = 
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz";,
+            "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz";,
+        ],
+        build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
+    )
+
+    maybe(
+        http_archive,
+        name = "com_google_protobuf",
+        sha256 = 
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
+        strip_prefix = "protobuf-3.20.1",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz";,
+            
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz";,
+        ],
+    )
 
-    if "rules_python" not in native.existing_rules():
-        http_archive(
-            name = "rules_python",
-            sha256 = 
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
-            strip_prefix = "rules_python-0.8.1",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz";,
-                
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz";,
-            ],
-        )
-
-    if "com_google_googletest" not in native.existing_rules():
-         http_archive(
-             name = "com_google_googletest",
-             sha256 = 
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
-             strip_prefix = "googletest-release-1.11.0",
-             urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz";,
-                
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz";,
-             ],
-         )
-
-    if "com_github_gulrak_filesystem" not in native.existing_rules():
-        http_archive(
-            name = "com_github_gulrak_filesystem",
-            strip_prefix = "filesystem-1.5.0",
-            sha256 = 
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz";,
-                "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz";,
-            ],
-            build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
-        )
-
-    if "com_github_gabime_spdlog" not in native.existing_rules():
-        http_archive(
-            name = "com_github_gabime_spdlog",
-            strip_prefix = "spdlog-1.9.2",
-            sha256 = 
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz";,
-                
"https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz";,
-            ],
-            build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
-        )
-
-    if "com_github_fmtlib_fmt" not in native.existing_rules():
-        http_archive(
-            name = "com_github_fmtlib_fmt",
-            strip_prefix = "fmt-8.0.1",
-            sha256 = 
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz";,
-                "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz";,
-            ],
-            build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
-        )
-
-    if "com_google_protobuf" not in native.existing_rules():
-        http_archive(
-            name = "com_google_protobuf",
-            sha256 = 
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
-            strip_prefix = "protobuf-3.20.1",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz";,
-                
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz";,
-            ],
-        )
-
-    if "rules_proto_grpc" not in native.existing_rules():
-        http_archive(
-            name = "rules_proto_grpc",
-            sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
-            strip_prefix = "rules_proto_grpc-4.1.1",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz";,
-                
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz";
-            ],
-        )
+    maybe(
+        http_archive,
+        name = "rules_proto_grpc",
+        sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
+        strip_prefix = "rules_proto_grpc-4.1.1",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz";,
+            
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz";
+        ],
+    )
 
     maybe(
         http_archive,
@@ -100,16 +89,27 @@ def rocketmq_deps():
         strip_prefix = "opencensus-cpp-0.4.1",
     )
 
-    if "com_google_absl" not in native.existing_rules():
-        http_archive(
-            name = "com_google_absl",
-            sha256 = 
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
-            strip_prefix = "abseil-cpp-20211102.0",
-            urls = [
-                
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz";,
-                
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz";,
-            ],
-        )
+    maybe(
+        http_archive,
+        name = "com_google_absl",
+        sha256 = 
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
+        strip_prefix = "abseil-cpp-20211102.0",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz";,
+            
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz";,
+        ],
+    )
+
+    maybe(
+        http_archive,
+        name = "com_github_gflags_gflags",
+        strip_prefix = "gflags-2.2.2",
+        sha256 = 
"34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz";,
+            "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz";,
+        ]
+    )
 
     maybe(
         http_archive,
@@ -145,6 +145,17 @@ def rocketmq_deps():
         strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
     )
 
+    maybe(
+        http_archive,
+        name = "rules_python",
+        sha256 = 
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
+        strip_prefix = "rules_python-0.8.1",
+        urls = [
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz";,
+            
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz";,
+        ],
+    )
+
     maybe(
         http_archive,
         name = "rules_swift",
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index c2dbab4..5dd0a10 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -23,6 +23,7 @@ cc_binary(
     ],
     deps = [
         "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
     ],
 )
 
@@ -86,26 +87,6 @@ cc_binary(
 #     ],
 # )
 
-# cc_binary(
-#     name = "push_consumer_with_custom_executor",
-#     srcs = [
-#         "PushConsumerWithCustomExecutor.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
-# cc_binary(
-#     name = "push_consumer_with_throttle",
-#     srcs = [
-#         "PushConsumerWithThrottle.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
 # cc_binary(
 #     name = "sql_consumer",
 #     srcs = [
@@ -116,16 +97,6 @@ cc_binary(
 #     ],
 # )
 
-# cc_binary(
-#     name = "benchmark_push_consumer",
-#     srcs = [
-#         "BenchmarkPushConsumer.cpp",
-#     ],
-#     deps = [
-#         "//source/rocketmq:rocketmq_library",
-#     ],
-# )
-
 # cc_binary(
 #     name = "example_transaction_producer",
 #     srcs = [
diff --git a/cpp/examples/BenchmarkPushConsumer.cpp 
b/cpp/examples/BenchmarkPushConsumer.cpp
deleted file mode 100644
index 400fa80..0000000
--- a/cpp/examples/BenchmarkPushConsumer.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <atomic>
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-using namespace rocketmq;
-
-class CounterMessageListener : public StandardMessageListener {
-public:
-  explicit CounterMessageListener(std::atomic_long& counter) : 
counter_(counter) {
-  }
-
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    counter_.fetch_add(msgs.size());
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::atomic_long& counter_;
-};
-
-int main(int argc, char* argv[]) {
-
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  std::atomic_long counter(0);
-
-  DefaultMQPushConsumer push_consumer("CID_sample");
-  MessageListener* listener = new CounterMessageListener(counter);
-
-  push_consumer.setGroupName("CID_sample");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  push_consumer.subscribe("TopicTest", "*");
-  push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.start();
-
-  std::atomic_bool stopped(false);
-  std::thread report_thread([&counter, &stopped]() {
-    while (!stopped) {
-      std::this_thread::sleep_for(std::chrono::seconds(1));
-      long qps;
-      while (true) {
-        qps = counter.load(std::memory_order_relaxed);
-        if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
-          break;
-        }
-      }
-      std::cout << "QPS: " << qps << std::endl;
-    }
-  });
-
-  std::this_thread::sleep_for(std::chrono::minutes(30));
-  stopped.store(true);
-
-  if (report_thread.joinable()) {
-    report_thread.join();
-  }
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 9bdb9b4..d44f041 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -20,6 +20,7 @@
 #include <random>
 #include <system_error>
 
+#include "gflags/gflags.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
 
@@ -46,12 +47,15 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
+DEFINE_string(topic, "lingchu_normal_topic", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Access URL, provided by 
your service provider");
+
 int main(int argc, char* argv[]) {
-  const char* topic = "lingchu_normal_topic";
-  const char* name_server = "121.196.167.124:8081";
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
 
-  auto producer =
-      
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
+  auto producer = Producer::newBuilder()
+                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .build();
 
   std::atomic_bool stopped;
   std::atomic_long count(0);
@@ -74,15 +78,16 @@ int main(int argc, char* argv[]) {
 
   try {
     for (int i = 0; i < 256; ++i) {
-      auto message = 
Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
+      auto message =
+          
Message::newBuilder().withTopic(FLAGS_topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build();
       std::error_code ec;
       SendReceipt send_receipt = producer.send(std::move(message), ec);
       std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
       count++;
     }
-  } catch (...) {
-    std::cerr << "Ah...No!!!" << std::endl;
-  }
+      } catch (...) {
+        std::cerr << "Ah...No!!!" << std::endl;
+      }
   stopped.store(true, std::memory_order_relaxed);
   if (stats_thread.joinable()) {
     stats_thread.join();
diff --git a/cpp/examples/PushConsumerWithCustomExecutor.cpp 
b/cpp/examples/PushConsumerWithCustomExecutor.cpp
deleted file mode 100644
index 82dd57d..0000000
--- a/cpp/examples/PushConsumerWithCustomExecutor.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-#include "rocketmq/State.h"
-#include <atomic>
-#include <chrono>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ExecutorImpl {
-public:
-  ExecutorImpl() : state_(State::CREATED) {
-  }
-
-  virtual ~ExecutorImpl() {
-    switch (state_.load(std::memory_order_relaxed)) {
-      case CREATED:
-      case STOPPING:
-      case STOPPED:
-        break;
-
-      case STARTING:
-      case STARTED:
-        state_.store(State::STOPPED);
-        if (worker_.joinable()) {
-          worker_.join();
-        }
-        break;
-    }
-  }
-
-  void submit(const std::function<void(void)>& task) {
-    if (State::STOPPED == state_.load(std::memory_order_relaxed)) {
-      return;
-    }
-
-    {
-      std::unique_lock<std::mutex> lock(task_mtx_);
-      tasks_.push_back(task);
-    }
-    cv_.notify_one();
-  }
-
-  void start() {
-    State expected = State::CREATED;
-    if (state_.compare_exchange_strong(expected, State::STARTING)) {
-      worker_ = std::thread(std::bind(&ExecutorImpl::loop, this));
-      state_.store(State::STARTED);
-    }
-  }
-
-  void stop() {
-    state_.store(State::STOPPED);
-    if (worker_.joinable()) {
-      worker_.join();
-    }
-  }
-
-private:
-  void loop() {
-    while (state_.load(std::memory_order_relaxed) != State::STOPPED) {
-      std::function<void(void)> func;
-      {
-        std::unique_lock<std::mutex> lk(task_mtx_);
-        if (!tasks_.empty()) {
-          func = tasks_.back();
-        }
-      }
-
-      if (func) {
-        func();
-      } else {
-        std::unique_lock<std::mutex> lk(task_mtx_);
-        cv_.wait_for(lk, std::chrono::seconds(3),
-                     [&]() { return state_.load(std::memory_order_relaxed) == 
State::STOPPED || !tasks_.empty(); });
-      }
-    }
-  }
-
-  std::atomic<State> state_;
-  std::vector<std::function<void(void)>> tasks_;
-  std::mutex task_mtx_;
-  std::condition_variable cv_;
-  std::thread worker_;
-};
-
-class SampleMQMessageListener : public StandardMessageListener {
-public:
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    std::lock_guard<std::mutex> lk(console_mtx_);
-    for (const MQMessageExt& msg : msgs) {
-      std::cout << "Topic=" << msg.getTopic() << ", MsgId=" << msg.getMsgId() 
<< ", Body=" << msg.getBody()
-                << std::endl;
-    }
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::mutex console_mtx_;
-};
-
-ROCKETMQ_NAMESPACE_END
-
-int main(int argc, char* argv[]) {
-  using namespace ROCKETMQ_NAMESPACE;
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  DefaultMQPushConsumer push_consumer("TestGroup");
-  MessageListener* listener = new SampleMQMessageListener;
-
-  auto pool = new ExecutorImpl;
-  pool->start();
-  push_consumer.setCustomExecutor(std::bind(&ExecutorImpl::submit, pool, 
std::placeholders::_1));
-  push_consumer.setGroupName("TestGroup");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  push_consumer.subscribe("TestTopic", "*");
-  push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.start();
-
-  std::this_thread::sleep_for(std::chrono::minutes(30));
-  pool->stop();
-  delete pool;
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}
diff --git a/cpp/examples/PushConsumerWithThrottle.cpp 
b/cpp/examples/PushConsumerWithThrottle.cpp
deleted file mode 100644
index c03b6c5..0000000
--- a/cpp/examples/PushConsumerWithThrottle.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/DefaultMQPushConsumer.h"
-
-#include <atomic>
-#include <chrono>
-#include <iostream>
-#include <mutex>
-#include <thread>
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class CounterMessageListener : public StandardMessageListener {
-public:
-  explicit CounterMessageListener(std::atomic_long& counter) : 
counter_(counter) {
-  }
-
-  ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) 
override {
-    counter_.fetch_add(msgs.size());
-    return ConsumeMessageResult::SUCCESS;
-  }
-
-private:
-  std::atomic_long& counter_;
-};
-
-ROCKETMQ_NAMESPACE_END
-
-int main(int argc, char* argv[]) {
-
-  using namespace ROCKETMQ_NAMESPACE;
-
-  Logger& logger = getLogger();
-  logger.setLevel(Level::Debug);
-  logger.init();
-
-  std::atomic_long counter(0);
-
-  DefaultMQPushConsumer push_consumer("TestGroup");
-  MessageListener* listener = new CounterMessageListener(counter);
-
-  push_consumer.setGroupName("TestGroup");
-  push_consumer.setInstanceName("CID_sample_member_0");
-  push_consumer.subscribe("TestTopic", "*");
-  push_consumer.setNamesrvAddr("11.167.164.105:9876");
-  push_consumer.registerMessageListener(listener);
-  push_consumer.setThrottle("TestTopic", 20);
-  push_consumer.start();
-
-  std::atomic_bool stopped(false);
-  std::thread report_thread([&counter, &stopped]() {
-    while (!stopped) {
-      std::this_thread::sleep_for(std::chrono::seconds(1));
-      long qps;
-      while (true) {
-        qps = counter.load(std::memory_order_relaxed);
-        if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) {
-          break;
-        }
-      }
-      std::cout << "QPS: " << qps << std::endl;
-    }
-  });
-
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-  stopped.store(true);
-
-  if (report_thread.joinable()) {
-    report_thread.join();
-  }
-
-  push_consumer.shutdown();
-  return EXIT_SUCCESS;
-}

Reply via email to