This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop-cpp in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 99c505a6f26678db9d9d29878276324106dd797e Author: Zhanhui Li <[email protected]> AuthorDate: Thu Mar 14 11:12:21 2024 +0800 Fix C++ SDK core dump issue (#2) * fix: sync namespace from server Settings * feat: use opentelemetry for tracing/metrics/logging * Remove broken links and add targets to generate compile_commands.json Signed-off-by: lizhanhui <[email protected]> * fix: timer task may invoke a call to a destructing stream Signed-off-by: Zhanhui Li <[email protected]> * fix: update document as we have changed the way to generate compile_commands.json * fix: static_cast StreamState to std::uint8_t as enum class by default is not formattable Signed-off-by: Li Zhanhui <[email protected]> --------- Signed-off-by: lizhanhui <[email protected]> Signed-off-by: Zhanhui Li <[email protected]> Signed-off-by: Li Zhanhui <[email protected]> --- cpp/CMakeLists.txt | 4 +- cpp/README.md | 25 ++++--- cpp/WORKSPACE | 26 +++++++- cpp/bazel/rocketmq_deps.bzl | 25 +++---- cpp/examples/ExampleProducer.cpp | 3 +- cpp/examples/ExampleProducerWithAsync.cpp | 3 +- cpp/examples/ExampleProducerWithFifoMessage.cpp | 3 +- cpp/examples/ExampleProducerWithTimedMessage.cpp | 3 +- .../ExampleProducerWithTransactionalMessage.cpp | 3 +- cpp/examples/ExamplePushConsumer.cpp | 3 +- cpp/examples/ExampleSimpleConsumer.cpp | 3 +- cpp/source/client/ClientManagerImpl.cpp | 76 ++++++++++++++-------- cpp/source/client/TelemetryBidiReactor.cpp | 37 ++++++++++- cpp/source/client/include/TelemetryBidiReactor.h | 2 + cpp/source/rocketmq/include/SimpleConsumerImpl.h | 2 +- cpp/tools/gen_compile_commands.sh | 5 ++ 16 files changed, 154 insertions(+), 69 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 374466e6..42f7cd70 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.19) +cmake_minimum_required(VERSION 3.16) project(rocketmq) set(CMAKE_CXX_STANDARD 11) set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -31,4 +31,4 @@ if (BUILD_EXAMPLES) find_package(gflags REQUIRED) find_package(ZLIB REQUIRED) add_subdirectory(examples) -endif () \ No newline at end of file +endif () diff --git a/cpp/README.md b/cpp/README.md index 117a2426..2ed65c2e 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -217,19 +217,11 @@ if "com_google_googletest" not in native.existing_rules(): 1. VSCode + Clangd [Clangd](https://clangd.llvm.org/) is a really nice code completion tool. Clangd requires compile_commands.json to work properly. - To generate the file, we need clone another repository along with the current one. - - ```sh - git clone [email protected]:grailbio/bazel-compilation-database.git - ``` - - From current repository root, - + To generate the file, run the following command: ```sh - ../bazel-compilation-database/generate.sh + ./tools/gen_compile_commands.sh ``` - - Once the script completes, you should have compile_commands.json file in the repository root directory. + Once the script completes, you should have compile_commands.json file in the workspace directory, aka, ${repository}/cpp. LLVM project has an extension for [clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd). Please install it from the extension market. @@ -239,8 +231,15 @@ if "com_google_googletest" not in native.existing_rules(): "C_Cpp.intelliSenseEngine": "Disabled", "C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from both extensions. "C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles from both extensions (clangd's seem to be more reliable anyway). - "clangd.path": "/Users/lizhanhui/usr/clangd_12.0.0/bin/clangd", - "clangd.arguments": ["-log=verbose", "-pretty", "--background-index"], + "clangd.path": "/usr/bin/clangd", + "clangd.arguments": [ + "-log=verbose", + "-pretty", + "--background-index", + "--header-insertion=never", + "--compile-commands-dir=${workspaceFolder}/", + "--query-driver=**" + ], "clangd.onConfigChanged": "restart", ``` diff --git a/cpp/WORKSPACE b/cpp/WORKSPACE index d09dd445..3c3d6476 100644 --- a/cpp/WORKSPACE +++ b/cpp/WORKSPACE @@ -27,4 +27,28 @@ http_archive( load("@io_buildbuddy_buildbuddy_toolchain//:deps.bzl", "buildbuddy_deps") buildbuddy_deps() load("@io_buildbuddy_buildbuddy_toolchain//:rules.bzl", "buildbuddy") -buildbuddy(name = "buildbuddy_toolchain") \ No newline at end of file +buildbuddy(name = "buildbuddy_toolchain") + +# Generate compile_commands.json +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + + +# Hedron's Compile Commands Extractor for Bazel +# https://github.com/hedronvision/bazel-compile-commands-extractor +http_archive( + name = "hedron_compile_commands", + + # Replace the commit hash (0e990032f3c5a866e72615cf67e5ce22186dcb97) in both places (below) with the latest (https://github.com/hedronvision/bazel-compile-commands-extractor/commits/main), rather than using the stale one here. + # Even better, set up Renovate and let it do the work for you (see "Suggestion: Updates" in the README). + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/204aa593e002cbd177d30f11f54cff3559110bb9.tar.gz", + strip_prefix = "bazel-compile-commands-extractor-204aa593e002cbd177d30f11f54cff3559110bb9", + # When you first run this tool, it'll recommend a sha256 hash to put here with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a canonical reproducible form can be obtained by modifying arguments sha256 = ..." +) +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") +hedron_compile_commands_setup() +load("@hedron_compile_commands//:workspace_setup_transitive.bzl", "hedron_compile_commands_setup_transitive") +hedron_compile_commands_setup_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive_transitive() \ No newline at end of file diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl index eae31a6f..684e55eb 100644 --- a/cpp/bazel/rocketmq_deps.bzl +++ b/cpp/bazel/rocketmq_deps.bzl @@ -16,7 +16,6 @@ def rocketmq_deps(): sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", strip_prefix = "googletest-release-1.11.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz", "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz", ], ) @@ -27,7 +26,6 @@ def rocketmq_deps(): strip_prefix = "filesystem-1.5.0", sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz", "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD", @@ -39,7 +37,6 @@ def rocketmq_deps(): strip_prefix = "spdlog-1.9.2", sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz", "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD", @@ -51,7 +48,6 @@ def rocketmq_deps(): strip_prefix = "fmt-8.0.1", sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz", "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD", @@ -63,7 +59,6 @@ def rocketmq_deps(): sha256 = "8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930", strip_prefix = "protobuf-3.20.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz", "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz", ], ) @@ -74,7 +69,6 @@ def rocketmq_deps(): sha256 = "507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731", strip_prefix = "rules_proto_grpc-4.1.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz", "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz", ], ) @@ -84,7 +78,6 @@ def rocketmq_deps(): name = "io_opencensus_cpp", sha256 = "317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz", "https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz", ], strip_prefix = "opencensus-cpp-0.4.1", @@ -96,7 +89,6 @@ def rocketmq_deps(): sha256 = "dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4", strip_prefix = "abseil-cpp-20211102.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz", "https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz", ], ) @@ -107,7 +99,6 @@ def rocketmq_deps(): strip_prefix = "gflags-2.2.2", sha256 = "34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz", "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz", ], ) @@ -118,7 +109,6 @@ def rocketmq_deps(): strip_prefix = "grpc-1.46.3", sha256 = "d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz", "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz", ], ) @@ -130,7 +120,6 @@ def rocketmq_deps(): build_file = "@org_apache_rocketmq//third_party:asio.BUILD", strip_prefix = "asio-1.18.2", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz", "https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz", ], ) @@ -140,7 +129,6 @@ def rocketmq_deps(): name = "com_google_googleapis", sha256 = "e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip", "https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip", ], strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95", @@ -152,7 +140,6 @@ def rocketmq_deps(): sha256 = "cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd", strip_prefix = "rules_python-0.8.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz", "https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz", ], ) @@ -161,7 +148,6 @@ def rocketmq_deps(): http_archive, name = "rules_swift", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_swift/rules_swift-0.27.0.tar.gz", "https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz", ], strip_prefix = "rules_swift-0.27.0", @@ -172,7 +158,6 @@ def rocketmq_deps(): name = "io_bazel_rules_go", sha256 = "685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip", "https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip", "https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip", ], @@ -184,7 +169,15 @@ def rocketmq_deps(): sha256 = "e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d", strip_prefix = "rules_proto-4.0.0-3.20.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz", "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz", ], ) + + maybe( + http_archive, + name = "com_github_opentelemetry", + strip_prefix = "opentelemetry-cpp-1.14.2", + urls = [ + "https://github.com/open-telemetry/opentelemetry-cpp/archive/refs/tags/v1.14.2.tar.gz" + ] + ) diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index 57293c24..2e170ce5 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -57,6 +57,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -77,7 +78,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp index 62ee7781..5e9cc12d 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleProducerWithAsync.cpp @@ -97,6 +97,7 @@ DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_uint32(concurrency, 128, "Concurrency of async send"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -116,7 +117,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp b/cpp/examples/ExampleProducerWithFifoMessage.cpp index 09b8d407..f45b2d12 100644 --- a/cpp/examples/ExampleProducerWithFifoMessage.cpp +++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp @@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -74,7 +75,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp b/cpp/examples/ExampleProducerWithTimedMessage.cpp index 8f12f5b6..62b81385 100644 --- a/cpp/examples/ExampleProducerWithTimedMessage.cpp +++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp @@ -56,6 +56,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -75,7 +76,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp index befb18ca..13d7f046 100644 --- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp +++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp @@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -79,7 +80,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .withTransactionChecker(checker) diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp index 1e20b2ee..ab106cb7 100644 --- a/cpp/examples/ExamplePushConsumer.cpp +++ b/cpp/examples/ExamplePushConsumer.cpp @@ -30,6 +30,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provide DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -58,7 +59,7 @@ int main(int argc, char* argv[]) { .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withConsumeThreads(4) .withListener(listener) diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index 4c30214f..17a84b78 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -29,6 +29,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provide DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -51,7 +52,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .subscribe(FLAGS_topic, tag) .build(); diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 5865dbb2..643d3741 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -24,12 +24,9 @@ #include <utility> #include <vector> -#include "apache/rocketmq/v2/definition.pb.h" #include "InvocationContext.h" #include "LogInterceptor.h" #include "LogInterceptorFactory.h" -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" #include "MessageExt.h" #include "MetadataConstants.h" #include "MixAll.h" @@ -40,18 +37,22 @@ #include "Scheduler.h" #include "TlsHelper.h" #include "UtilAll.h" +#include "apache/rocketmq/v2/definition.pb.h" #include "google/protobuf/util/time_util.h" #include "grpcpp/create_channel.h" #include "rocketmq/ErrorCode.h" +#include "rocketmq/Logger.h" #include "rocketmq/SendReceipt.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl) - : scheduler_(std::make_shared<SchedulerImpl>()), resource_namespace_(std::move(resource_namespace)), + : scheduler_(std::make_shared<SchedulerImpl>()), + resource_namespace_(std::move(resource_namespace)), state_(State::CREATED), callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())), - withSsl_(withSsl){ + withSsl_(withSsl) { certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>(); tls_channel_credential_options_.set_verify_server_certs(false); tls_channel_credential_options_.set_check_call_host(false); @@ -175,8 +176,10 @@ std::vector<std::string> ClientManagerImpl::cleanOfflineRpcClients() { return removed; } -void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata& metadata, - const HeartbeatRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::heartbeat(const std::string& target_host, + const Metadata& metadata, + const HeartbeatRequest& request, + std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) { SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.DebugString()); auto client = getRpcClient(target_host, true); @@ -279,7 +282,9 @@ void ClientManagerImpl::doHeartbeat() { } } -bool ClientManagerImpl::send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, +bool ClientManagerImpl::send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, SendCallback cb) { assert(cb); SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString()); @@ -390,7 +395,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met ec = ErrorCode::Unauthorized; break; } - + case rmq::Code::FORBIDDEN: { SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::Forbidden; @@ -440,7 +445,8 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met } case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: { - SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, invocation_context->response.DebugString()); + SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, + invocation_context->response.DebugString()); ec = ErrorCode::MessagePropertyConflictWithType; break; } @@ -470,7 +476,8 @@ std::shared_ptr<grpc::Channel> ClientManagerImpl::createChannel(const std::strin std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories; interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>()); auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( - target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, std::move(interceptor_factories)); + target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, + std::move(interceptor_factories)); return channel; } @@ -514,7 +521,8 @@ void ClientManagerImpl::cleanRpcClients() { } SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, - const SendMessageResponse& response, std::error_code& ec) { + const SendMessageResponse& response, + std::error_code& ec) { SendReceipt send_receipt; switch (response.status().code()) { @@ -541,8 +549,10 @@ void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) { clients_.emplace_back(std::move(client)); } -void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metadata& metadata, - const QueryRouteRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::resolveRoute(const std::string& target_host, + const Metadata& metadata, + const QueryRouteRequest& request, + std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) { SPDLOG_DEBUG("Name server connection URL: {}", target_host); SPDLOG_DEBUG("Query route request: {}", request.DebugString()); @@ -646,7 +656,9 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad } void ClientManagerImpl::queryAssignment( - const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request, + const std::string& target, + const Metadata& metadata, + const QueryAssignmentRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) { SPDLOG_DEBUG("Prepare to send query assignment request to broker[address={}]", target); @@ -748,8 +760,10 @@ void ClientManagerImpl::queryAssignment( client->asyncQueryAssignment(request, invocation_context); } -void ClientManagerImpl::receiveMessage(const std::string& target_host, const Metadata& metadata, - const ReceiveMessageRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::receiveMessage(const std::string& target_host, + const Metadata& metadata, + const ReceiveMessageRequest& request, + std::chrono::milliseconds timeout, ReceiveMessageCallback cb) { SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: {}", target_host, request.DebugString()); RpcClientSharedPtr client = getRpcClient(target_host); @@ -765,7 +779,6 @@ State ClientManagerImpl::state() const { } MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) { - assert(item.topic().resource_namespace() == resource_namespace_); auto builder = Message::newBuilder(); // base @@ -955,8 +968,11 @@ SchedulerSharedPtr ClientManagerImpl::getScheduler() { return scheduler_; } -void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, const AckMessageRequest& request, - std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) { +void ClientManagerImpl::ack(const std::string& target, + const Metadata& metadata, + const AckMessageRequest& request, + std::chrono::milliseconds timeout, + const std::function<void(const std::error_code&)>& cb) { std::string target_host(target.data(), target.length()); SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. AckMessageRequest: {}", target_host, request.DebugString()); @@ -1066,8 +1082,11 @@ void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, } void ClientManagerImpl::changeInvisibleDuration( - const std::string& target_host, const Metadata& metadata, const ChangeInvisibleDurationRequest& request, - std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& completion_callback) { + const std::string& target_host, + const Metadata& metadata, + const ChangeInvisibleDurationRequest& request, + std::chrono::milliseconds timeout, + const std::function<void(const std::error_code&)>& completion_callback) { RpcClientSharedPtr client = getRpcClient(target_host); assert(client); auto invocation_context = new InvocationContext<ChangeInvisibleDurationResponse>(); @@ -1133,7 +1152,7 @@ void ClientManagerImpl::changeInvisibleDuration( ec = ErrorCode::Forbidden; break; } - + case rmq::Code::INTERNAL_SERVER_ERROR: { SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::InternalServerError; @@ -1159,7 +1178,9 @@ void ClientManagerImpl::changeInvisibleDuration( } void ClientManagerImpl::endTransaction( - const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request, + const std::string& target_host, + const Metadata& metadata, + const EndTransactionRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) { RpcClientSharedPtr client = getRpcClient(target_host); @@ -1339,7 +1360,7 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe ec = ErrorCode::ServiceUnavailable; break; } - + case rmq::Code::TOO_MANY_REQUESTS: { ec = ErrorCode::TooManyRequests; break; @@ -1362,7 +1383,8 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe client->asyncForwardMessageToDeadLetterQueue(request, invocation_context); } -std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, const Metadata& metadata, +std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, + const Metadata& metadata, const NotifyClientTerminationRequest& request, std::chrono::milliseconds timeout) { std::error_code ec; @@ -1446,4 +1468,4 @@ void ClientManagerImpl::submit(std::function<void()> task) { const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task"; const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task"; -ROCKETMQ_NAMESPACE_END \ No newline at end of file +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index 6c5b2f93..a55a7473 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -22,13 +22,13 @@ #include <utility> #include "ClientManager.h" -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" #include "MessageExt.h" #include "Metadata.h" #include "RpcClient.h" #include "Signature.h" #include "google/protobuf/util/time_util.h" +#include "rocketmq/Logger.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN @@ -247,10 +247,33 @@ void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std } void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { + // The server may have implicitly assumed a namespace for the client. + if (!settings.publishing().topics().empty()) { + for (const auto& topic : settings.publishing().topics()) { + if (topic.resource_namespace() != client->config().resource_namespace) { + SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, + topic.resource_namespace()); + client->config().resource_namespace = topic.resource_namespace(); + break; + } + } + } client->config().publisher.max_body_size = settings.publishing().max_body_size(); } void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { + // The server may have implicitly assumed a namespace for the client. + if (!settings.subscription().subscriptions().empty()) { + for (const auto& subscription : settings.subscription().subscriptions()) { + if (subscription.topic().resource_namespace() != client->config().resource_namespace) { + SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, + subscription.topic().resource_namespace()); + client->config().resource_namespace = subscription.topic().resource_namespace(); + break; + } + } + } + client->config().subscriber.fifo = settings.subscription().fifo(); auto polling_timeout = google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); @@ -273,6 +296,16 @@ void TelemetryBidiReactor::write(TelemetryCommand command) { void TelemetryBidiReactor::fireWrite() { SPDLOG_DEBUG("{}#fireWrite", peer_address_); + + { + absl::MutexLock lk(&stream_state_mtx_); + if (stream_state_ != StreamState::Active && stream_state_ != StreamState::Created) { + SPDLOG_WARN("TelemetryBidiReactor to {} is closed or half-closed, ignoring fireWrite event. stream-state={}", + peer_address_, static_cast<std::uint8_t>(stream_state_)); + return; + } + } + { absl::MutexLock lk(&writes_mtx_); if (writes_.empty()) { diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index 9fe65f31..aba116eb 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -75,6 +75,8 @@ private: /** * @brief Buffered commands to write to server + * + * TODO: move buffered commands to a shared container, which may survive multiple TelemetryBidiReactor lifecycles. */ std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_); absl::Mutex writes_mtx_; diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index 45aa61b9..7fc63b6d 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -25,7 +25,7 @@ using namespace std::chrono; ROCKETMQ_NAMESPACE_BEGIN -class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_this<SimpleConsumerImpl> { +class SimpleConsumerImpl : virtual public ClientImpl, public std::enable_shared_from_this<SimpleConsumerImpl> { public: SimpleConsumerImpl(std::string group); diff --git a/cpp/tools/gen_compile_commands.sh b/cpp/tools/gen_compile_commands.sh new file mode 100755 index 00000000..c1dd541b --- /dev/null +++ b/cpp/tools/gen_compile_commands.sh @@ -0,0 +1,5 @@ +TOOLS_DIR=$(dirname "$0") +WORKSPACE_DIR=$(dirname "$TOOLS_DIR") +cd $WORKSPACE_DIR + +bazel run @hedron_compile_commands//:refresh_all \ No newline at end of file
