This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit b1d383aeaa5f4efd75a61d4c9abbb3e5eea47bb0 Author: Li Zhanhui <[email protected]> AuthorDate: Wed Jul 20 15:07:41 2022 +0800 Prepare to sync examples --- cpp/bazel/rocketmq_deps.bzl | 189 +++++++++++++----------- cpp/examples/BUILD.bazel | 31 +--- cpp/examples/BenchmarkPushConsumer.cpp | 83 ----------- cpp/examples/ExampleProducer.cpp | 21 ++- cpp/examples/PushConsumerWithCustomExecutor.cpp | 147 ------------------ cpp/examples/PushConsumerWithThrottle.cpp | 88 ----------- 6 files changed, 114 insertions(+), 445 deletions(-) diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl index 939b97c..9306f94 100644 --- a/cpp/bazel/rocketmq_deps.bzl +++ b/cpp/bazel/rocketmq_deps.bzl @@ -8,86 +8,75 @@ def rocketmq_deps(): name = "opentelementry_api", actual = "@com_github_opentelemetry//api:api", ) + + maybe( + http_archive, + name = "com_google_googletest", + sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", + strip_prefix = "googletest-release-1.11.0", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz", + "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz", + ], + ) + + maybe( + http_archive, + name = "com_github_gulrak_filesystem", + strip_prefix = "filesystem-1.5.0", + sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz", + "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz", + ], + build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD", + ) + + maybe( + http_archive, + name = "com_github_gabime_spdlog", + strip_prefix = "spdlog-1.9.2", + sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz", + "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz", + ], + build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD", + ) + + maybe( + http_archive, + name = "com_github_fmtlib_fmt", + strip_prefix = "fmt-8.0.1", + sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz", + "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz", + ], + build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD", + ) + + maybe( + http_archive, + name = "com_google_protobuf", + sha256 = "8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930", + strip_prefix = "protobuf-3.20.1", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz", + "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz", + ], + ) - if "rules_python" not in native.existing_rules(): - http_archive( - name = "rules_python", - sha256 = "cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd", - strip_prefix = "rules_python-0.8.1", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz", - "https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz", - ], - ) - - if "com_google_googletest" not in native.existing_rules(): - http_archive( - name = "com_google_googletest", - sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", - strip_prefix = "googletest-release-1.11.0", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz", - "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz", - ], - ) - - if "com_github_gulrak_filesystem" not in native.existing_rules(): - http_archive( - name = "com_github_gulrak_filesystem", - strip_prefix = "filesystem-1.5.0", - sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz", - "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz", - ], - build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD", - ) - - if "com_github_gabime_spdlog" not in native.existing_rules(): - http_archive( - name = "com_github_gabime_spdlog", - strip_prefix = "spdlog-1.9.2", - sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz", - "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz", - ], - build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD", - ) - - if "com_github_fmtlib_fmt" not in native.existing_rules(): - http_archive( - name = "com_github_fmtlib_fmt", - strip_prefix = "fmt-8.0.1", - sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz", - "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz", - ], - build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD", - ) - - if "com_google_protobuf" not in native.existing_rules(): - http_archive( - name = "com_google_protobuf", - sha256 = "8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930", - strip_prefix = "protobuf-3.20.1", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz", - "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz", - ], - ) - - if "rules_proto_grpc" not in native.existing_rules(): - http_archive( - name = "rules_proto_grpc", - sha256 = "507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731", - strip_prefix = "rules_proto_grpc-4.1.1", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz", - "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz" - ], - ) + maybe( + http_archive, + name = "rules_proto_grpc", + sha256 = "507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731", + strip_prefix = "rules_proto_grpc-4.1.1", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz", + "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz" + ], + ) maybe( http_archive, @@ -100,16 +89,27 @@ def rocketmq_deps(): strip_prefix = "opencensus-cpp-0.4.1", ) - if "com_google_absl" not in native.existing_rules(): - http_archive( - name = "com_google_absl", - sha256 = "dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4", - strip_prefix = "abseil-cpp-20211102.0", - urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz", - "https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz", - ], - ) + maybe( + http_archive, + name = "com_google_absl", + sha256 = "dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4", + strip_prefix = "abseil-cpp-20211102.0", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz", + "https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz", + ], + ) + + maybe( + http_archive, + name = "com_github_gflags_gflags", + strip_prefix = "gflags-2.2.2", + sha256 = "34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz", + "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz", + ] + ) maybe( http_archive, @@ -145,6 +145,17 @@ def rocketmq_deps(): strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95", ) + maybe( + http_archive, + name = "rules_python", + sha256 = "cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd", + strip_prefix = "rules_python-0.8.1", + urls = [ + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz", + "https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz", + ], + ) + maybe( http_archive, name = "rules_swift", diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel index c2dbab4..5dd0a10 100644 --- a/cpp/examples/BUILD.bazel +++ b/cpp/examples/BUILD.bazel @@ -23,6 +23,7 @@ cc_binary( ], deps = [ "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", ], ) @@ -86,26 +87,6 @@ cc_binary( # ], # ) -# cc_binary( -# name = "push_consumer_with_custom_executor", -# srcs = [ -# "PushConsumerWithCustomExecutor.cpp", -# ], -# deps = [ -# "//source/rocketmq:rocketmq_library", -# ], -# ) - -# cc_binary( -# name = "push_consumer_with_throttle", -# srcs = [ -# "PushConsumerWithThrottle.cpp", -# ], -# deps = [ -# "//source/rocketmq:rocketmq_library", -# ], -# ) - # cc_binary( # name = "sql_consumer", # srcs = [ @@ -116,16 +97,6 @@ cc_binary( # ], # ) -# cc_binary( -# name = "benchmark_push_consumer", -# srcs = [ -# "BenchmarkPushConsumer.cpp", -# ], -# deps = [ -# "//source/rocketmq:rocketmq_library", -# ], -# ) - # cc_binary( # name = "example_transaction_producer", # srcs = [ diff --git a/cpp/examples/BenchmarkPushConsumer.cpp b/cpp/examples/BenchmarkPushConsumer.cpp deleted file mode 100644 index 400fa80..0000000 --- a/cpp/examples/BenchmarkPushConsumer.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rocketmq/DefaultMQPushConsumer.h" - -#include <atomic> -#include <chrono> -#include <iostream> -#include <mutex> -#include <thread> - -using namespace rocketmq; - -class CounterMessageListener : public StandardMessageListener { -public: - explicit CounterMessageListener(std::atomic_long& counter) : counter_(counter) { - } - - ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override { - counter_.fetch_add(msgs.size()); - return ConsumeMessageResult::SUCCESS; - } - -private: - std::atomic_long& counter_; -}; - -int main(int argc, char* argv[]) { - - Logger& logger = getLogger(); - logger.setLevel(Level::Debug); - logger.init(); - - std::atomic_long counter(0); - - DefaultMQPushConsumer push_consumer("CID_sample"); - MessageListener* listener = new CounterMessageListener(counter); - - push_consumer.setGroupName("CID_sample"); - push_consumer.setInstanceName("CID_sample_member_0"); - push_consumer.subscribe("TopicTest", "*"); - push_consumer.setNamesrvAddr("11.167.164.105:9876"); - push_consumer.registerMessageListener(listener); - push_consumer.start(); - - std::atomic_bool stopped(false); - std::thread report_thread([&counter, &stopped]() { - while (!stopped) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - long qps; - while (true) { - qps = counter.load(std::memory_order_relaxed); - if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) { - break; - } - } - std::cout << "QPS: " << qps << std::endl; - } - }); - - std::this_thread::sleep_for(std::chrono::minutes(30)); - stopped.store(true); - - if (report_thread.joinable()) { - report_thread.join(); - } - - push_consumer.shutdown(); - return EXIT_SUCCESS; -} diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index 9bdb9b4..d44f041 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -20,6 +20,7 @@ #include <random> #include <system_error> +#include "gflags/gflags.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" @@ -46,12 +47,15 @@ std::string randomString(std::string::size_type len) { return result; } +DEFINE_string(topic, "lingchu_normal_topic", "Topic to which messages are published"); +DEFINE_string(access_point, "121.196.167.124:8081", "Access URL, provided by your service provider"); + int main(int argc, char* argv[]) { - const char* topic = "lingchu_normal_topic"; - const char* name_server = "121.196.167.124:8081"; + gflags::ParseCommandLineFlags(&argc, &argv, true); - auto producer = - Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build(); + auto producer = Producer::newBuilder() + .withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build()) + .build(); std::atomic_bool stopped; std::atomic_long count(0); @@ -74,15 +78,16 @@ int main(int argc, char* argv[]) { try { for (int i = 0; i < 256; ++i) { - auto message = Message::newBuilder().withTopic(topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build(); + auto message = + Message::newBuilder().withTopic(FLAGS_topic).withTag("TagA").withKeys({"Key-0"}).withBody(body).build(); std::error_code ec; SendReceipt send_receipt = producer.send(std::move(message), ec); std::cout << "Message-ID: " << send_receipt.message_id << std::endl; count++; } - } catch (...) { - std::cerr << "Ah...No!!!" << std::endl; - } + } catch (...) { + std::cerr << "Ah...No!!!" << std::endl; + } stopped.store(true, std::memory_order_relaxed); if (stats_thread.joinable()) { stats_thread.join(); diff --git a/cpp/examples/PushConsumerWithCustomExecutor.cpp b/cpp/examples/PushConsumerWithCustomExecutor.cpp deleted file mode 100644 index 82dd57d..0000000 --- a/cpp/examples/PushConsumerWithCustomExecutor.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rocketmq/DefaultMQPushConsumer.h" -#include "rocketmq/State.h" -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <iostream> -#include <mutex> -#include <thread> - -ROCKETMQ_NAMESPACE_BEGIN - -class ExecutorImpl { -public: - ExecutorImpl() : state_(State::CREATED) { - } - - virtual ~ExecutorImpl() { - switch (state_.load(std::memory_order_relaxed)) { - case CREATED: - case STOPPING: - case STOPPED: - break; - - case STARTING: - case STARTED: - state_.store(State::STOPPED); - if (worker_.joinable()) { - worker_.join(); - } - break; - } - } - - void submit(const std::function<void(void)>& task) { - if (State::STOPPED == state_.load(std::memory_order_relaxed)) { - return; - } - - { - std::unique_lock<std::mutex> lock(task_mtx_); - tasks_.push_back(task); - } - cv_.notify_one(); - } - - void start() { - State expected = State::CREATED; - if (state_.compare_exchange_strong(expected, State::STARTING)) { - worker_ = std::thread(std::bind(&ExecutorImpl::loop, this)); - state_.store(State::STARTED); - } - } - - void stop() { - state_.store(State::STOPPED); - if (worker_.joinable()) { - worker_.join(); - } - } - -private: - void loop() { - while (state_.load(std::memory_order_relaxed) != State::STOPPED) { - std::function<void(void)> func; - { - std::unique_lock<std::mutex> lk(task_mtx_); - if (!tasks_.empty()) { - func = tasks_.back(); - } - } - - if (func) { - func(); - } else { - std::unique_lock<std::mutex> lk(task_mtx_); - cv_.wait_for(lk, std::chrono::seconds(3), - [&]() { return state_.load(std::memory_order_relaxed) == State::STOPPED || !tasks_.empty(); }); - } - } - } - - std::atomic<State> state_; - std::vector<std::function<void(void)>> tasks_; - std::mutex task_mtx_; - std::condition_variable cv_; - std::thread worker_; -}; - -class SampleMQMessageListener : public StandardMessageListener { -public: - ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override { - std::lock_guard<std::mutex> lk(console_mtx_); - for (const MQMessageExt& msg : msgs) { - std::cout << "Topic=" << msg.getTopic() << ", MsgId=" << msg.getMsgId() << ", Body=" << msg.getBody() - << std::endl; - } - return ConsumeMessageResult::SUCCESS; - } - -private: - std::mutex console_mtx_; -}; - -ROCKETMQ_NAMESPACE_END - -int main(int argc, char* argv[]) { - using namespace ROCKETMQ_NAMESPACE; - Logger& logger = getLogger(); - logger.setLevel(Level::Debug); - logger.init(); - - DefaultMQPushConsumer push_consumer("TestGroup"); - MessageListener* listener = new SampleMQMessageListener; - - auto pool = new ExecutorImpl; - pool->start(); - push_consumer.setCustomExecutor(std::bind(&ExecutorImpl::submit, pool, std::placeholders::_1)); - push_consumer.setGroupName("TestGroup"); - push_consumer.setInstanceName("CID_sample_member_0"); - push_consumer.subscribe("TestTopic", "*"); - push_consumer.setNamesrvAddr("11.167.164.105:9876"); - push_consumer.registerMessageListener(listener); - push_consumer.start(); - - std::this_thread::sleep_for(std::chrono::minutes(30)); - pool->stop(); - delete pool; - - push_consumer.shutdown(); - return EXIT_SUCCESS; -} diff --git a/cpp/examples/PushConsumerWithThrottle.cpp b/cpp/examples/PushConsumerWithThrottle.cpp deleted file mode 100644 index c03b6c5..0000000 --- a/cpp/examples/PushConsumerWithThrottle.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rocketmq/DefaultMQPushConsumer.h" - -#include <atomic> -#include <chrono> -#include <iostream> -#include <mutex> -#include <thread> - -ROCKETMQ_NAMESPACE_BEGIN - -class CounterMessageListener : public StandardMessageListener { -public: - explicit CounterMessageListener(std::atomic_long& counter) : counter_(counter) { - } - - ConsumeMessageResult consumeMessage(const std::vector<MQMessageExt>& msgs) override { - counter_.fetch_add(msgs.size()); - return ConsumeMessageResult::SUCCESS; - } - -private: - std::atomic_long& counter_; -}; - -ROCKETMQ_NAMESPACE_END - -int main(int argc, char* argv[]) { - - using namespace ROCKETMQ_NAMESPACE; - - Logger& logger = getLogger(); - logger.setLevel(Level::Debug); - logger.init(); - - std::atomic_long counter(0); - - DefaultMQPushConsumer push_consumer("TestGroup"); - MessageListener* listener = new CounterMessageListener(counter); - - push_consumer.setGroupName("TestGroup"); - push_consumer.setInstanceName("CID_sample_member_0"); - push_consumer.subscribe("TestTopic", "*"); - push_consumer.setNamesrvAddr("11.167.164.105:9876"); - push_consumer.registerMessageListener(listener); - push_consumer.setThrottle("TestTopic", 20); - push_consumer.start(); - - std::atomic_bool stopped(false); - std::thread report_thread([&counter, &stopped]() { - while (!stopped) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - long qps; - while (true) { - qps = counter.load(std::memory_order_relaxed); - if (counter.compare_exchange_weak(qps, 0, std::memory_order_relaxed)) { - break; - } - } - std::cout << "QPS: " << qps << std::endl; - } - }); - - std::this_thread::sleep_for(std::chrono::seconds(10)); - stopped.store(true); - - if (report_thread.joinable()) { - report_thread.join(); - } - - push_consumer.shutdown(); - return EXIT_SUCCESS; -}
