This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop-cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 99c505a6f26678db9d9d29878276324106dd797e
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Mar 14 11:12:21 2024 +0800

    Fix C++ SDK core dump issue (#2)
    
    * fix: sync namespace from server Settings
    
    * feat: use opentelemetry for tracing/metrics/logging
    
    * Remove broken links and add targets to generate compile_commands.json
    
    Signed-off-by: lizhanhui <[email protected]>
    
    * fix: timer task may invoke a call to a destructing stream
    
    Signed-off-by: Zhanhui Li <[email protected]>
    
    * fix: update document as we have changed the way to generate 
compile_commands.json
    
    * fix: static_cast StreamState to std::uint8_t as enum class by default is 
not formattable
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    ---------
    
    Signed-off-by: lizhanhui <[email protected]>
    Signed-off-by: Zhanhui Li <[email protected]>
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/CMakeLists.txt                                 |  4 +-
 cpp/README.md                                      | 25 ++++---
 cpp/WORKSPACE                                      | 26 +++++++-
 cpp/bazel/rocketmq_deps.bzl                        | 25 +++----
 cpp/examples/ExampleProducer.cpp                   |  3 +-
 cpp/examples/ExampleProducerWithAsync.cpp          |  3 +-
 cpp/examples/ExampleProducerWithFifoMessage.cpp    |  3 +-
 cpp/examples/ExampleProducerWithTimedMessage.cpp   |  3 +-
 .../ExampleProducerWithTransactionalMessage.cpp    |  3 +-
 cpp/examples/ExamplePushConsumer.cpp               |  3 +-
 cpp/examples/ExampleSimpleConsumer.cpp             |  3 +-
 cpp/source/client/ClientManagerImpl.cpp            | 76 ++++++++++++++--------
 cpp/source/client/TelemetryBidiReactor.cpp         | 37 ++++++++++-
 cpp/source/client/include/TelemetryBidiReactor.h   |  2 +
 cpp/source/rocketmq/include/SimpleConsumerImpl.h   |  2 +-
 cpp/tools/gen_compile_commands.sh                  |  5 ++
 16 files changed, 154 insertions(+), 69 deletions(-)

diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 374466e6..42f7cd70 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -1,4 +1,4 @@
-cmake_minimum_required(VERSION 3.19)
+cmake_minimum_required(VERSION 3.16)
 project(rocketmq)
 set(CMAKE_CXX_STANDARD 11)
 set(CMAKE_POSITION_INDEPENDENT_CODE ON)
@@ -31,4 +31,4 @@ if (BUILD_EXAMPLES)
     find_package(gflags REQUIRED)
     find_package(ZLIB REQUIRED)
     add_subdirectory(examples)
-endif ()
\ No newline at end of file
+endif ()
diff --git a/cpp/README.md b/cpp/README.md
index 117a2426..2ed65c2e 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -217,19 +217,11 @@ if "com_google_googletest" not in native.existing_rules():
 1. VSCode + Clangd
 
    [Clangd](https://clangd.llvm.org/) is a really nice code completion tool. 
Clangd requires compile_commands.json to work properly.
-   To generate the file, we need clone another repository along with the 
current one.
-
-   ```sh
-   git clone [email protected]:grailbio/bazel-compilation-database.git
-   ```
-
-   From current repository root,
-
+   To generate the file, run the following command:
    ```sh
-   ../bazel-compilation-database/generate.sh
+    ./tools/gen_compile_commands.sh
    ```
-
-   Once the script completes, you should have compile_commands.json file in 
the repository root directory.
+   Once the script completes, you should have compile_commands.json file in 
the workspace directory, aka, ${repository}/cpp.
 
    LLVM project has an extension for 
[clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd).
 Please install it from the extension market.
 
@@ -239,8 +231,15 @@ if "com_google_googletest" not in native.existing_rules():
       "C_Cpp.intelliSenseEngine": "Disabled",
       "C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from 
both extensions.
       "C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles 
from both extensions (clangd's seem to be more reliable anyway).
-      "clangd.path": "/Users/lizhanhui/usr/clangd_12.0.0/bin/clangd",
-      "clangd.arguments": ["-log=verbose", "-pretty", "--background-index"],
+      "clangd.path": "/usr/bin/clangd",
+      "clangd.arguments": [
+         "-log=verbose",
+         "-pretty",
+         "--background-index",
+         "--header-insertion=never",
+         "--compile-commands-dir=${workspaceFolder}/",
+         "--query-driver=**"
+      ],
       "clangd.onConfigChanged": "restart",
    ```
 
diff --git a/cpp/WORKSPACE b/cpp/WORKSPACE
index d09dd445..3c3d6476 100644
--- a/cpp/WORKSPACE
+++ b/cpp/WORKSPACE
@@ -27,4 +27,28 @@ http_archive(
 load("@io_buildbuddy_buildbuddy_toolchain//:deps.bzl", "buildbuddy_deps")
 buildbuddy_deps()
 load("@io_buildbuddy_buildbuddy_toolchain//:rules.bzl", "buildbuddy")
-buildbuddy(name = "buildbuddy_toolchain")
\ No newline at end of file
+buildbuddy(name = "buildbuddy_toolchain")
+
+# Generate compile_commands.json
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+
+# Hedron's Compile Commands Extractor for Bazel
+# https://github.com/hedronvision/bazel-compile-commands-extractor
+http_archive(
+    name = "hedron_compile_commands",
+
+    # Replace the commit hash (0e990032f3c5a866e72615cf67e5ce22186dcb97) in 
both places (below) with the latest 
(https://github.com/hedronvision/bazel-compile-commands-extractor/commits/main),
 rather than using the stale one here.
+    # Even better, set up Renovate and let it do the work for you (see 
"Suggestion: Updates" in the README).
+    url = 
"https://github.com/hedronvision/bazel-compile-commands-extractor/archive/204aa593e002cbd177d30f11f54cff3559110bb9.tar.gz";,
+    strip_prefix = 
"bazel-compile-commands-extractor-204aa593e002cbd177d30f11f54cff3559110bb9",
+    # When you first run this tool, it'll recommend a sha256 hash to put here 
with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a 
canonical reproducible form can be obtained by modifying arguments sha256 = ..."
+)
+load("@hedron_compile_commands//:workspace_setup.bzl", 
"hedron_compile_commands_setup")
+hedron_compile_commands_setup()
+load("@hedron_compile_commands//:workspace_setup_transitive.bzl", 
"hedron_compile_commands_setup_transitive")
+hedron_compile_commands_setup_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", 
"hedron_compile_commands_setup_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl",
 "hedron_compile_commands_setup_transitive_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive_transitive()
\ No newline at end of file
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index eae31a6f..684e55eb 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -16,7 +16,6 @@ def rocketmq_deps():
         sha256 = 
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
         strip_prefix = "googletest-release-1.11.0",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz";,
             
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz";,
         ],
     )
@@ -27,7 +26,6 @@ def rocketmq_deps():
         strip_prefix = "filesystem-1.5.0",
         sha256 = 
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz";,
             "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz";,
         ],
         build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
@@ -39,7 +37,6 @@ def rocketmq_deps():
         strip_prefix = "spdlog-1.9.2",
         sha256 = 
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz";,
             "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz";,
         ],
         build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
@@ -51,7 +48,6 @@ def rocketmq_deps():
         strip_prefix = "fmt-8.0.1",
         sha256 = 
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz";,
             "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz";,
         ],
         build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
@@ -63,7 +59,6 @@ def rocketmq_deps():
         sha256 = 
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
         strip_prefix = "protobuf-3.20.1",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz";,
             
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz";,
         ],
     )
@@ -74,7 +69,6 @@ def rocketmq_deps():
         sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
         strip_prefix = "rules_proto_grpc-4.1.1",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz";,
             
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz";,
         ],
     )
@@ -84,7 +78,6 @@ def rocketmq_deps():
         name = "io_opencensus_cpp",
         sha256 = 
"317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz";,
             
"https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz";,
         ],
         strip_prefix = "opencensus-cpp-0.4.1",
@@ -96,7 +89,6 @@ def rocketmq_deps():
         sha256 = 
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
         strip_prefix = "abseil-cpp-20211102.0",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz";,
             
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz";,
         ],
     )
@@ -107,7 +99,6 @@ def rocketmq_deps():
         strip_prefix = "gflags-2.2.2",
         sha256 = 
"34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz";,
             "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz";,
         ],
     )
@@ -118,7 +109,6 @@ def rocketmq_deps():
         strip_prefix = "grpc-1.46.3",
         sha256 = 
"d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz";,
             "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz";,
         ],
     )
@@ -130,7 +120,6 @@ def rocketmq_deps():
         build_file = "@org_apache_rocketmq//third_party:asio.BUILD",
         strip_prefix = "asio-1.18.2",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz";,
             
"https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz";,
         ],
     )
@@ -140,7 +129,6 @@ def rocketmq_deps():
         name = "com_google_googleapis",
         sha256 = 
"e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip";,
             
"https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip";,
         ],
         strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
@@ -152,7 +140,6 @@ def rocketmq_deps():
         sha256 = 
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
         strip_prefix = "rules_python-0.8.1",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz";,
             
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz";,
         ],
     )
@@ -161,7 +148,6 @@ def rocketmq_deps():
         http_archive,
         name = "rules_swift",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_swift/rules_swift-0.27.0.tar.gz";,
             
"https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz";,
         ],
         strip_prefix = "rules_swift-0.27.0",
@@ -172,7 +158,6 @@ def rocketmq_deps():
         name = "io_bazel_rules_go",
         sha256 = 
"685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip";,
             
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip";,
             
"https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip";,
         ],
@@ -184,7 +169,15 @@ def rocketmq_deps():
         sha256 = 
"e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d",
         strip_prefix = "rules_proto-4.0.0-3.20.0",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz";,
             
"https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz";,
         ],
     )
+
+    maybe(
+        http_archive,
+        name = "com_github_opentelemetry",
+        strip_prefix = "opentelemetry-cpp-1.14.2",
+        urls = [
+            
"https://github.com/open-telemetry/opentelemetry-cpp/archive/refs/tags/v1.14.2.tar.gz";
+        ]
+    )
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 57293c24..2e170ce5 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -57,6 +57,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -77,7 +78,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleProducerWithAsync.cpp
index 62ee7781..5e9cc12d 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -97,6 +97,7 @@ DEFINE_uint32(total, 256, "Number of sample messages to 
publish");
 DEFINE_uint32(concurrency, 128, "Concurrency of async send");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -116,7 +117,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp 
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index 09b8d407..f45b2d12 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -74,7 +75,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp 
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index 8f12f5b6..62b81385 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -56,6 +56,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -75,7 +76,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp 
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index befb18ca..13d7f046 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -79,7 +80,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .withTransactionChecker(checker)
diff --git a/cpp/examples/ExamplePushConsumer.cpp 
b/cpp/examples/ExamplePushConsumer.cpp
index 1e20b2ee..ab106cb7 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -30,6 +30,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service 
access URL, provide
 DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -58,7 +59,7 @@ int main(int argc, char* argv[]) {
                                                   
.withEndpoints(FLAGS_access_point)
                                                   
.withRequestTimeout(std::chrono::seconds(3))
                                                   
.withCredentialsProvider(credentials_provider)
-                                                  .withSsl(true)
+                                                  .withSsl(FLAGS_tls)
                                                   .build())
                            .withConsumeThreads(4)
                            .withListener(listener)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index 4c30214f..17a84b78 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -29,6 +29,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service 
access URL, provide
 DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -51,7 +52,7 @@ int main(int argc, char* argv[]) {
                              .withConfiguration(Configuration::newBuilder()
                                                     
.withEndpoints(FLAGS_access_point)
                                                     
.withCredentialsProvider(credentials_provider)
-                                                    .withSsl(true)
+                                                    .withSsl(FLAGS_tls)
                                                     .build())
                              .subscribe(FLAGS_topic, tag)
                              .build();
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 5865dbb2..643d3741 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -24,12 +24,9 @@
 #include <utility>
 #include <vector>
 
-#include "apache/rocketmq/v2/definition.pb.h"
 #include "InvocationContext.h"
 #include "LogInterceptor.h"
 #include "LogInterceptorFactory.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "MessageExt.h"
 #include "MetadataConstants.h"
 #include "MixAll.h"
@@ -40,18 +37,22 @@
 #include "Scheduler.h"
 #include "TlsHelper.h"
 #include "UtilAll.h"
+#include "apache/rocketmq/v2/definition.pb.h"
 #include "google/protobuf/util/time_util.h"
 #include "grpcpp/create_channel.h"
 #include "rocketmq/ErrorCode.h"
+#include "rocketmq/Logger.h"
 #include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
 ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
withSsl)
-    : scheduler_(std::make_shared<SchedulerImpl>()), 
resource_namespace_(std::move(resource_namespace)),
+    : scheduler_(std::make_shared<SchedulerImpl>()),
+      resource_namespace_(std::move(resource_namespace)),
       state_(State::CREATED),
       
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
-      withSsl_(withSsl){
+      withSsl_(withSsl) {
   certificate_verifier_ = 
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
   tls_channel_credential_options_.set_verify_server_certs(false);
   tls_channel_credential_options_.set_check_call_host(false);
@@ -175,8 +176,10 @@ std::vector<std::string> 
ClientManagerImpl::cleanOfflineRpcClients() {
   return removed;
 }
 
-void ClientManagerImpl::heartbeat(const std::string& target_host, const 
Metadata& metadata,
-                                  const HeartbeatRequest& request, 
std::chrono::milliseconds timeout,
+void ClientManagerImpl::heartbeat(const std::string& target_host,
+                                  const Metadata& metadata,
+                                  const HeartbeatRequest& request,
+                                  std::chrono::milliseconds timeout,
                                   const std::function<void(const 
std::error_code&, const HeartbeatResponse&)>& cb) {
   SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, 
request.DebugString());
   auto client = getRpcClient(target_host, true);
@@ -279,7 +282,9 @@ void ClientManagerImpl::doHeartbeat() {
   }
 }
 
-bool ClientManagerImpl::send(const std::string& target_host, const Metadata& 
metadata, SendMessageRequest& request,
+bool ClientManagerImpl::send(const std::string& target_host,
+                             const Metadata& metadata,
+                             SendMessageRequest& request,
                              SendCallback cb) {
   assert(cb);
   SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", 
target_host, request.DebugString());
@@ -390,7 +395,7 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
         ec = ErrorCode::Unauthorized;
         break;
       }
-        
+
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), 
invocation_context->remote_address);
         ec = ErrorCode::Forbidden;
@@ -440,7 +445,8 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
       }
 
       case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
-        SPDLOG_WARN("Message-property-conflict-with-type: Host={}, 
Response={}", invocation_context->remote_address, 
invocation_context->response.DebugString());
+        SPDLOG_WARN("Message-property-conflict-with-type: Host={}, 
Response={}", invocation_context->remote_address,
+                    invocation_context->response.DebugString());
         ec = ErrorCode::MessagePropertyConflictWithType;
         break;
       }
@@ -470,7 +476,8 @@ std::shared_ptr<grpc::Channel> 
ClientManagerImpl::createChannel(const std::strin
   
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
 interceptor_factories;
   
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
   auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-      target_host, withSsl_ ? channel_credential_ : 
grpc::InsecureChannelCredentials(), channel_arguments_, 
std::move(interceptor_factories));
+      target_host, withSsl_ ? channel_credential_ : 
grpc::InsecureChannelCredentials(), channel_arguments_,
+      std::move(interceptor_factories));
   return channel;
 }
 
@@ -514,7 +521,8 @@ void ClientManagerImpl::cleanRpcClients() {
 }
 
 SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& 
message_queue,
-                                                   const SendMessageResponse& 
response, std::error_code& ec) {
+                                                   const SendMessageResponse& 
response,
+                                                   std::error_code& ec) {
   SendReceipt send_receipt;
 
   switch (response.status().code()) {
@@ -541,8 +549,10 @@ void 
ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
   clients_.emplace_back(std::move(client));
 }
 
-void ClientManagerImpl::resolveRoute(const std::string& target_host, const 
Metadata& metadata,
-                                     const QueryRouteRequest& request, 
std::chrono::milliseconds timeout,
+void ClientManagerImpl::resolveRoute(const std::string& target_host,
+                                     const Metadata& metadata,
+                                     const QueryRouteRequest& request,
+                                     std::chrono::milliseconds timeout,
                                      const std::function<void(const 
std::error_code&, const TopicRouteDataPtr&)>& cb) {
   SPDLOG_DEBUG("Name server connection URL: {}", target_host);
   SPDLOG_DEBUG("Query route request: {}", request.DebugString());
@@ -646,7 +656,9 @@ void ClientManagerImpl::resolveRoute(const std::string& 
target_host, const Metad
 }
 
 void ClientManagerImpl::queryAssignment(
-    const std::string& target, const Metadata& metadata, const 
QueryAssignmentRequest& request,
+    const std::string& target,
+    const Metadata& metadata,
+    const QueryAssignmentRequest& request,
     std::chrono::milliseconds timeout,
     const std::function<void(const std::error_code&, const 
QueryAssignmentResponse&)>& cb) {
   SPDLOG_DEBUG("Prepare to send query assignment request to 
broker[address={}]", target);
@@ -748,8 +760,10 @@ void ClientManagerImpl::queryAssignment(
   client->asyncQueryAssignment(request, invocation_context);
 }
 
-void ClientManagerImpl::receiveMessage(const std::string& target_host, const 
Metadata& metadata,
-                                       const ReceiveMessageRequest& request, 
std::chrono::milliseconds timeout,
+void ClientManagerImpl::receiveMessage(const std::string& target_host,
+                                       const Metadata& metadata,
+                                       const ReceiveMessageRequest& request,
+                                       std::chrono::milliseconds timeout,
                                        ReceiveMessageCallback cb) {
   SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: 
{}", target_host, request.DebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -765,7 +779,6 @@ State ClientManagerImpl::state() const {
 }
 
 MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) 
{
-  assert(item.topic().resource_namespace() == resource_namespace_);
   auto builder = Message::newBuilder();
 
   // base
@@ -955,8 +968,11 @@ SchedulerSharedPtr ClientManagerImpl::getScheduler() {
   return scheduler_;
 }
 
-void ClientManagerImpl::ack(const std::string& target, const Metadata& 
metadata, const AckMessageRequest& request,
-                            std::chrono::milliseconds timeout, const 
std::function<void(const std::error_code&)>& cb) {
+void ClientManagerImpl::ack(const std::string& target,
+                            const Metadata& metadata,
+                            const AckMessageRequest& request,
+                            std::chrono::milliseconds timeout,
+                            const std::function<void(const std::error_code&)>& 
cb) {
   std::string target_host(target.data(), target.length());
   SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. 
AckMessageRequest: {}", target_host,
                request.DebugString());
@@ -1066,8 +1082,11 @@ void ClientManagerImpl::ack(const std::string& target, 
const Metadata& metadata,
 }
 
 void ClientManagerImpl::changeInvisibleDuration(
-    const std::string& target_host, const Metadata& metadata, const 
ChangeInvisibleDurationRequest& request,
-    std::chrono::milliseconds timeout, const std::function<void(const 
std::error_code&)>& completion_callback) {
+    const std::string& target_host,
+    const Metadata& metadata,
+    const ChangeInvisibleDurationRequest& request,
+    std::chrono::milliseconds timeout,
+    const std::function<void(const std::error_code&)>& completion_callback) {
   RpcClientSharedPtr client = getRpcClient(target_host);
   assert(client);
   auto invocation_context = new 
InvocationContext<ChangeInvisibleDurationResponse>();
@@ -1133,7 +1152,7 @@ void ClientManagerImpl::changeInvisibleDuration(
         ec = ErrorCode::Forbidden;
         break;
       }
-        
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), 
invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
@@ -1159,7 +1178,9 @@ void ClientManagerImpl::changeInvisibleDuration(
 }
 
 void ClientManagerImpl::endTransaction(
-    const std::string& target_host, const Metadata& metadata, const 
EndTransactionRequest& request,
+    const std::string& target_host,
+    const Metadata& metadata,
+    const EndTransactionRequest& request,
     std::chrono::milliseconds timeout,
     const std::function<void(const std::error_code&, const 
EndTransactionResponse&)>& cb) {
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -1339,7 +1360,7 @@ void 
ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
         ec = ErrorCode::ServiceUnavailable;
         break;
       }
-        
+
       case rmq::Code::TOO_MANY_REQUESTS: {
         ec = ErrorCode::TooManyRequests;
         break;
@@ -1362,7 +1383,8 @@ void 
ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
   client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
 }
 
-std::error_code ClientManagerImpl::notifyClientTermination(const std::string& 
target_host, const Metadata& metadata,
+std::error_code ClientManagerImpl::notifyClientTermination(const std::string& 
target_host,
+                                                           const Metadata& 
metadata,
                                                            const 
NotifyClientTerminationRequest& request,
                                                            
std::chrono::milliseconds timeout) {
   std::error_code ec;
@@ -1446,4 +1468,4 @@ void ClientManagerImpl::submit(std::function<void()> 
task) {
 const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task";
 const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task";
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 6c5b2f93..a55a7473 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -22,13 +22,13 @@
 #include <utility>
 
 #include "ClientManager.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "MessageExt.h"
 #include "Metadata.h"
 #include "RpcClient.h"
 #include "Signature.h"
 #include "google/protobuf/util/time_util.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -247,10 +247,33 @@ void TelemetryBidiReactor::applyBackoffPolicy(const 
rmq::Settings& settings, std
 }
 
 void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& 
settings, std::shared_ptr<Client> client) {
+  // The server may have implicitly assumed a namespace for the client.
+  if (!settings.publishing().topics().empty()) {
+    for (const auto& topic : settings.publishing().topics()) {
+      if (topic.resource_namespace() != client->config().resource_namespace) {
+        SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", 
client->config().resource_namespace,
+                    topic.resource_namespace());
+        client->config().resource_namespace = topic.resource_namespace();
+        break;
+      }
+    }
+  }
   client->config().publisher.max_body_size = 
settings.publishing().max_body_size();
 }
 
 void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& 
settings, std::shared_ptr<Client> client) {
+  // The server may have implicitly assumed a namespace for the client.
+  if (!settings.subscription().subscriptions().empty()) {
+    for (const auto& subscription : settings.subscription().subscriptions()) {
+      if (subscription.topic().resource_namespace() != 
client->config().resource_namespace) {
+        SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", 
client->config().resource_namespace,
+                    subscription.topic().resource_namespace());
+        client->config().resource_namespace = 
subscription.topic().resource_namespace();
+        break;
+      }
+    }
+  }
+
   client->config().subscriber.fifo = settings.subscription().fifo();
   auto polling_timeout =
       
google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout());
@@ -273,6 +296,16 @@ void TelemetryBidiReactor::write(TelemetryCommand command) 
{
 
 void TelemetryBidiReactor::fireWrite() {
   SPDLOG_DEBUG("{}#fireWrite", peer_address_);
+
+  {
+    absl::MutexLock lk(&stream_state_mtx_);
+    if (stream_state_ != StreamState::Active && stream_state_ != 
StreamState::Created) {
+      SPDLOG_WARN("TelemetryBidiReactor to {} is closed or half-closed, 
ignoring fireWrite event. stream-state={}",
+                  peer_address_, static_cast<std::uint8_t>(stream_state_));
+      return;
+    }
+  }
+
   {
     absl::MutexLock lk(&writes_mtx_);
     if (writes_.empty()) {
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 9fe65f31..aba116eb 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -75,6 +75,8 @@ private:
 
   /**
    * @brief Buffered commands to write to server
+   *
+   * TODO: move buffered commands to a shared container, which may survive 
multiple TelemetryBidiReactor lifecycles.
    */
   std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
   absl::Mutex writes_mtx_;
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h 
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 45aa61b9..7fc63b6d 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -25,7 +25,7 @@
 using namespace std::chrono;
 ROCKETMQ_NAMESPACE_BEGIN
 
-class SimpleConsumerImpl : public ClientImpl, public 
std::enable_shared_from_this<SimpleConsumerImpl> {
+class SimpleConsumerImpl : virtual public ClientImpl, public 
std::enable_shared_from_this<SimpleConsumerImpl> {
 public:
   SimpleConsumerImpl(std::string group);
 
diff --git a/cpp/tools/gen_compile_commands.sh 
b/cpp/tools/gen_compile_commands.sh
new file mode 100755
index 00000000..c1dd541b
--- /dev/null
+++ b/cpp/tools/gen_compile_commands.sh
@@ -0,0 +1,5 @@
+TOOLS_DIR=$(dirname "$0")
+WORKSPACE_DIR=$(dirname "$TOOLS_DIR")
+cd $WORKSPACE_DIR
+
+bazel run @hedron_compile_commands//:refresh_all
\ No newline at end of file


Reply via email to