This is an automated email from the ASF dual-hosted git repository.
lollipopjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f1df051c [C++] Improve CMake build system, fix warnings, and restore
cross-platform CI (#1257)
f1df051c is described below
commit f1df051ce390e0bbb2273a1330736eddb0a26c56
Author: lizhimins <[email protected]>
AuthorDate: Mon Jun 8 10:27:46 2026 +0800
[C++] Improve CMake build system, fix warnings, and restore cross-platform
CI (#1257)
---
.github/workflows/cpp_build.yml | 86 ++-
.github/workflows/cpp_coverage.yml | 2 +
.licenserc.yaml | 1 +
cpp/CMakeLists.txt | 52 +-
cpp/README.md | 321 +++++-------
cpp/docs/assets/BasicMode.png | Bin 42779 -> 0 bytes
cpp/docs/assets/class_diagram.png | Bin 364206 -> 0 bytes
cpp/proto/apache/rocketmq/v2/admin.proto | 44 +-
cpp/proto/apache/rocketmq/v2/definition.proto | 583 +--------------------
cpp/proto/apache/rocketmq/v2/service.proto | 439 +---------------
cpp/source/CMakeLists.txt | 10 +-
cpp/source/admin/tests/CMakeLists.txt | 4 +
cpp/source/base/include/FmtEnumFormatter.h | 76 +++
cpp/source/base/tests/CMakeLists.txt | 16 +
cpp/source/client/ClientManagerImpl.cpp | 1 +
cpp/source/client/ReceiveMessageStreamReader.cpp | 1 +
cpp/source/client/TelemetryBidiReactor.cpp | 1 +
cpp/source/client/tests/CMakeLists.txt | 19 +
cpp/source/concurrent/tests/CMakeLists.txt | 4 +
cpp/source/concurrent/tests/CountdownLatchTest.cpp | 1 +
cpp/source/rocketmq/ClientImpl.cpp | 1 +
cpp/source/rocketmq/ProducerImpl.cpp | 1 +
cpp/source/rocketmq/PushConsumerImpl.cpp | 1 +
cpp/source/rocketmq/tests/CMakeLists.txt | 19 +
cpp/source/scheduler/tests/CMakeLists.txt | 4 +
cpp/source/stats/MetricBidiReactor.cpp | 1 +
cpp/source/stats/tests/CMakeLists.txt | 4 +
27 files changed, 413 insertions(+), 1279 deletions(-)
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 2272b2b0..7b3e7a88 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -1,5 +1,5 @@
name: CPP Build
-on:
+on:
workflow_call:
jobs:
build:
@@ -8,33 +8,67 @@ jobs:
strategy:
fail-fast: false
matrix:
- # Disable VS 2022 before
https://github.com/bazelbuild/bazel/issues/18592 issue is solved
- # Remove macos-11 since there is no such runner available
- # os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019,
windows-2022]
- os: [ubuntu-22.04]
+ os: [ubuntu-22.04, macos-latest, windows-2022]
steps:
- - uses: actions/checkout@v2
- - name: Compile On Linux
- working-directory: ./cpp
+ - uses: actions/checkout@v3
+ with:
+ submodules: true
+
+ - name: Install dependencies (Linux)
if: runner.os == 'Linux'
- run: bazel build //...
- - name: Compile On Windows
- working-directory: ./cpp
- if: runner.os == 'Windows'
- run: bazel build //...
- - name: Compile On macOS
- working-directory: ./cpp
+ run: sudo apt-get update && sudo apt-get install -y libssl-dev
zlib1g-dev
+
+ - name: Install dependencies (macOS)
if: runner.os == 'macOS'
- run: bazel build //...
- - name: Run Unit Tests On Linux
- working-directory: ./cpp
- if: runner.os == 'Linux'
- run: bazel test //...
- - name: Run Unit Tests On Windows
- working-directory: ./cpp
+ run: brew install openssl zlib
+
+ - name: Install dependencies (Windows)
if: runner.os == 'Windows'
- run: bazel test --test_output=streamed //...
- - name: Run Unit Tests On macOS
+ run: choco install nasm
+
+ - name: Cache gRPC
+ id: cache-grpc
+ uses: actions/cache@v3
+ with:
+ path: ${{ runner.temp }}/grpc-install
+ key: grpc-1.46.3-${{ matrix.os }}
+
+ - name: Build gRPC from source
+ if: steps.cache-grpc.outputs.cache-hit != 'true'
+ shell: bash
+ run: |
+ TEMP_DIR="${{ runner.temp }}"
+ TEMP_DIR="${TEMP_DIR//\\//}"
+ ZLIB_PROVIDER=package
+ if [[ "$RUNNER_OS" == "Windows" ]]; then
+ ZLIB_PROVIDER=module
+ fi
+ git clone --recurse-submodules -b v1.46.3 --depth 1
https://github.com/grpc/grpc.git "$TEMP_DIR/grpc-src"
+ cmake -S "$TEMP_DIR/grpc-src" -B "$TEMP_DIR/grpc-build" \
+ -DCMAKE_BUILD_TYPE=Release \
+ -DCMAKE_INSTALL_PREFIX="$TEMP_DIR/grpc-install" \
+ -DCMAKE_POLICY_VERSION_MINIMUM=3.5 \
+ -DgRPC_INSTALL=ON \
+ -DgRPC_BUILD_TESTS=OFF \
+ -DgRPC_SSL_PROVIDER=package \
+ -DgRPC_ZLIB_PROVIDER=$ZLIB_PROVIDER
+ cmake --build "$TEMP_DIR/grpc-build" --parallel 4 --config Release
+ cmake --install "$TEMP_DIR/grpc-build" --config Release
+
+ - name: Build
working-directory: ./cpp
- if: runner.os == 'macOS'
- run: bazel test --test_output=errors //...
+ shell: bash
+ run: |
+ TEMP_DIR="${{ runner.temp }}"
+ TEMP_DIR="${TEMP_DIR//\\//}"
+ cmake -S . -B build \
+ -DCMAKE_BUILD_TYPE=Release \
+ -DCMAKE_PREFIX_PATH="$TEMP_DIR/grpc-install" \
+ -DCMAKE_POLICY_VERSION_MINIMUM=3.5 \
+ -DBUILD_EXAMPLES=OFF
+ cmake --build build --parallel 4 --config Release
+
+ - name: Run Unit Tests
+ working-directory: ./cpp/build
+ shell: bash
+ run: ctest --output-on-failure -j4 -C Release
diff --git a/.github/workflows/cpp_coverage.yml
b/.github/workflows/cpp_coverage.yml
index f9dcf77a..8cdeb5f7 100644
--- a/.github/workflows/cpp_coverage.yml
+++ b/.github/workflows/cpp_coverage.yml
@@ -9,6 +9,8 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
+ with:
+ submodules: true
- name: Generate coverage report
working-directory: ./cpp
run: bazel coverage //...
diff --git a/.licenserc.yaml b/.licenserc.yaml
index aab3cb38..a0c25620 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -48,6 +48,7 @@ header:
- 'cpp/WORKSPACE'
- 'cpp/.gitignore'
- 'cpp/third_party'
+ - 'cpp/proto/**'
- 'cpp/cmake'
- 'cpp/source/exports.map'
- 'php/grpc/**/*.php'
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 42f7cd70..444e2ceb 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -3,17 +3,15 @@ project(rocketmq)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
-set(gRPC_DEBUG ON)
-
-# Assume gRPC is installed $HOME/grpc
-list(APPEND CMAKE_PREFIX_PATH $ENV{HOME}/grpc)
-
list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake)
find_package(protobuf CONFIG REQUIRED)
find_package(gRPC CONFIG REQUIRED)
find_package(absl REQUIRED)
find_package(OpenSSL REQUIRED)
+if(NOT EXISTS
"${CMAKE_CURRENT_SOURCE_DIR}/proto/apache/rocketmq/v2/definition.proto")
+ message(FATAL_ERROR "Proto files not found. Run: git submodule update
--init --recursive")
+endif()
add_subdirectory(proto)
add_library(api INTERFACE)
@@ -25,10 +23,48 @@ add_subdirectory(source)
option(BUILD_EXAMPLES "Build example programs or not" ON)
if (BUILD_EXAMPLES)
- message("Would build examples")
- # Assume gflags is install in $HOME/gflags
- list(APPEND CMAKE_PREFIX_PATH $ENV{HOME}/gflags)
find_package(gflags REQUIRED)
find_package(ZLIB REQUIRED)
add_subdirectory(examples)
endif ()
+
+option(BUILD_TESTS "Build unit tests or not" ON)
+if (BUILD_TESTS)
+ enable_testing()
+ include(FetchContent)
+ FetchContent_Declare(
+ googletest
+ GIT_REPOSITORY https://github.com/google/googletest.git
+ GIT_TAG release-1.11.0
+ )
+ set(BUILD_GMOCK ON CACHE BOOL "" FORCE)
+ set(INSTALL_GTEST OFF CACHE BOOL "" FORCE)
+ FetchContent_MakeAvailable(googletest)
+
+ set(ROCKETMQ_INTERNAL_INCLUDE_DIRS
+ ${PROJECT_SOURCE_DIR}/include
+ ${PROJECT_SOURCE_DIR}/source/admin/include
+ ${PROJECT_SOURCE_DIR}/source/base/include
+ ${PROJECT_SOURCE_DIR}/source/client/include
+ ${PROJECT_SOURCE_DIR}/source/concurrent/include
+ ${PROJECT_SOURCE_DIR}/source/log/include
+ ${PROJECT_SOURCE_DIR}/source/rocketmq/include
+ ${PROJECT_SOURCE_DIR}/source/scheduler/include
+ ${PROJECT_SOURCE_DIR}/source/stats/include
+ ${PROJECT_SOURCE_DIR}/source/trace/include
+ ${PROJECT_SOURCE_DIR}/third_party/asio/1.18.2/include
+ ${PROJECT_SOURCE_DIR}/third_party/fmt/9.0.0/include
+ ${PROJECT_SOURCE_DIR}/third_party/spdlog/1.10.0/include
+ ${PROJECT_SOURCE_DIR}/third_party/filesystem/1.5.12/include
+ ${PROJECT_BINARY_DIR}/proto
+ )
+ set(ROCKETMQ_TEST_LINK_LIBS rocketmq proto gtest_main gmock)
+
+ add_subdirectory(source/admin/tests)
+ add_subdirectory(source/base/tests)
+ add_subdirectory(source/client/tests)
+ add_subdirectory(source/concurrent/tests)
+ add_subdirectory(source/rocketmq/tests)
+ add_subdirectory(source/scheduler/tests)
+ add_subdirectory(source/stats/tests)
+endif ()
diff --git a/cpp/README.md b/cpp/README.md
index a6d983fb..c555e027 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -4,250 +4,213 @@
## Introduction
-Apache RocketMQ supports two styles of APIs to acknowledge messages once they
are successfully processed.
+This is the C++ client for [Apache RocketMQ](https://rocketmq.apache.org/)
5.x, built on top of [gRPC](https://grpc.io/) and [Protocol
Buffers](https://developers.google.com/protocol-buffers). It follows the
[rocketmq-apis](https://github.com/apache/rocketmq-apis) specification and
provides Producer, FifoProducer, PushConsumer and SimpleConsumer APIs.
-1. Selective Acknowledgement
- For each logical message queue(aka, topic partition), SDK manages offsets
locally and periodically syncs committed offset to brokers in charge.
-2. Per Message Acknowledgement
- On consumption of each message, SDK acknowledge it to the broker instantly.
Broker is responsible of managing consuming progress.
+The proto definitions are shared via the `protos/` git submodule at the
repository root. Make sure to clone with `--recursive` or run `git submodule
update --init` before building.
-Either of them is widely adopted by products. Per message acknowledgement
simplifies SDK implementation while selective approach is more performant
considering that fewer RPCs are required.
+## Prerequisites
-## Transport Layer
+- C++ compiler supporting C++11
+- CMake 3.13+ or Bazel 5.2.0
+- gRPC — RPC communication framework, also brings in protobuf (serialization),
abseil (base library), and re2 (regex)
+- OpenSSL development headers — TLS encrypted communication
+- zlib development headers — message body compression
-This SDK is built on top of [gRPC](https://grpc.io/). [Protocol
Buffers](https://developers.google.com/protocol-buffers) is used to serialize
application messages.
+## How To Build
-## Type Hierarchy
+### Build with CMake (Recommended)
-Classes of this project are designed to be interface oriented.
-
-This paradigm makes dependency injection possible. DI is especially helpful
when writing unit tests.
+1. Install gRPC (v1.46.3) and its dependencies:
-## Core Concepts
-
-
-
-## Code Style
-
-Generally, we follow [Google C++ Code
Style](https://google.github.io/styleguide/cppguide.html). A few exceptions are
made to maintain API compatibility.
-
-1. C++ exception is only allowed in the outer wrapper classes, for example,
DefaultMQProducer, DefaultMQConsumer.
-2. C++ --std=c++11 is preferred. We intend to maintain the same compiler
compatibility matrix to [those of
gRPC](https://github.com/grpc/grpc/blob/master/BUILDING.md)
-3. Smart pointers are preferred where it makes sense. Use raw pointers only
when it is really necessary.
-
-## Dependency Management
-
-Considering SDK built on top of gRPC, ensure it is really necessary before
introducing a third-party library. Check [gRPC
deps](https://github.com/grpc/grpc/blob/master/bazel/grpc_deps.bzl) and [gRPC
extra deps](https://github.com/grpc/grpc/blob/master/bazel/grpc_extra_deps.bzl)
first!
-
-When introducing a third-party dependency or raising version of a dependency,
make sure it is back-off friendly. For example,
-
-```starlark
-if "com_google_googletest" not in native.existing_rules():
- http_archive(
- name = "com_google_googletest",
- sha256 =
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
- strip_prefix = "googletest-release-1.11.0",
- urls = [
-
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
- ],
- )
-```
-
-### How To Build
-
-#### Build with Bazel
-
-[Google Bazel](https://bazel.build/) is the primary build tool we supported,
Please follow [bazel installation
guide](https://docs.bazel.build/versions/main/install.html).
-
-1. Build
- From the workspace,
-
- ```starlark
- bazel build //...
+ ```shell
+ # Install system dependencies (pick your distro)
+ sudo apt install -y libssl-dev zlib1g-dev # Debian/Ubuntu
+ sudo yum install -y openssl-devel zlib-devel # CentOS/RHEL/Alibaba Cloud
Linux
+
+ # Clone gRPC source with submodules
+ git clone --recurse-submodules -b v1.46.3 --depth 1 \
+ https://github.com/grpc/grpc.git /tmp/grpc
+
+ # Build and install to /usr/local (system default)
+ cd /tmp/grpc && mkdir build && cd build
+ cmake -DCMAKE_BUILD_TYPE=Release \
+ -DgRPC_INSTALL=ON \
+ -DgRPC_BUILD_TESTS=OFF \
+ -DgRPC_SSL_PROVIDER=package \
+ -DgRPC_ZLIB_PROVIDER=package \
+ ..
+ make -j $(nproc)
+ sudo make install
```
-2. Run Unit Tests
- From the workspace,
+ Or install to a custom location (no sudo required):
- ```starlark
- bazel test //...
+ ```shell
+ cmake -DCMAKE_INSTALL_PREFIX=$HOME/grpc ...
+ make -j $(nproc) && make install
```
-#### Build with CMake
-
-1. Make sure you have installed a modern CMake 3.13+ and C++ compilation
toolchain that at least supports C++11;
+2. Install gflags (required for example programs):
-2. Following [gRPC installation
instructions](https://grpc.io/docs/languages/cpp/quickstart/) to install grpc.
+ ```shell
+ git clone -b v2.2.2 --depth 1 https://github.com/gflags/gflags.git
/tmp/gflags
+ cd /tmp/gflags && mkdir build && cd build
+ cmake -DCMAKE_BUILD_TYPE=Release ..
+ make -j $(nproc)
+ sudo make install
+ ```
- Note:
- * Remember to `export MY_INSTALL_DIR=$HOME/grpc` as our primary
CMakeLists.txt hints
+ If you don't need examples, skip this step and pass `-DBUILD_EXAMPLES=OFF`
when building.
- ```cmake
- list(APPEND CMAKE_PREFIX_PATH $ENV{HOME}/grpc)
- ```
+3. Build the project:
- If your grpc is installed somewhere else yet non-standard, please
adjust accordingly.
+ ```shell
+ cd cpp && mkdir build && cd build
+ cmake ..
+ make -j $(nproc)
+ ```
- * When configure grpc, use your pre-installed system package if possible;
+ If gRPC is installed to a non-default location, pass the path explicitly:
- ```shell
- cmake -DCMAKE_INSTALL_PREFIX=$HOME/grpc -DgRPC_SSL_PROVIDER=package
-DgRPC_ZLIB_PROVIDER=package
- ```
+ ```shell
+ cmake -DCMAKE_PREFIX_PATH=/path/to/grpc ..
+ ```
- A few more options are involved. Check CMakeLists.txt of grpc
+4. Run unit tests (from the build directory):
- ```cmake
- # Providers for third-party dependencies (gRPC_*_PROVIDER properties):
- # "module": build the dependency using sources from git submodule
(under third_party)
- # "package": use cmake's find_package functionality to locate a
pre-installed dependency
+ ```shell
+ cd build
+ ctest --output-on-failure -j$(nproc)
+ ```
- set(gRPC_ZLIB_PROVIDER "module" CACHE STRING "Provider of zlib
library")
- set_property(CACHE gRPC_ZLIB_PROVIDER PROPERTY STRINGS "module"
"package")
+5. CMake options:
- set(gRPC_CARES_PROVIDER "module" CACHE STRING "Provider of c-ares
library")
- set_property(CACHE gRPC_CARES_PROVIDER PROPERTY STRINGS "module"
"package")
+ | Option | Default | Description
|
+ | ---------------- | ------- |
---------------------------------------------------- |
+ | `BUILD_TESTS` | ON | Build unit tests (requires googletest,
auto-fetched) |
+ | `BUILD_EXAMPLES` | ON | Build example programs (requires gflags)
|
- set(gRPC_RE2_PROVIDER "module" CACHE STRING "Provider of re2 library")
- set_property(CACHE gRPC_RE2_PROVIDER PROPERTY STRINGS "module"
"package")
+ To skip tests and examples:
- set(gRPC_SSL_PROVIDER "module" CACHE STRING "Provider of ssl library")
- set_property(CACHE gRPC_SSL_PROVIDER PROPERTY STRINGS "module"
"package")
+ ```shell
+ cmake -DBUILD_TESTS=OFF -DBUILD_EXAMPLES=OFF ..
+ ```
- set(gRPC_PROTOBUF_PROVIDER "module" CACHE STRING "Provider of
protobuf library")
- set_property(CACHE gRPC_PROTOBUF_PROVIDER PROPERTY STRINGS "module"
"package")
+### Build with Bazel
- set(gRPC_PROTOBUF_PACKAGE_TYPE "" CACHE STRING "Algorithm for
searching protobuf package")
- set_property(CACHE gRPC_PROTOBUF_PACKAGE_TYPE PROPERTY STRINGS
"CONFIG" "MODULE")
+```shell
+cd cpp
+bazel build //...
+bazel test //...
+```
- if(gRPC_BUILD_TESTS)
- set(gRPC_BENCHMARK_PROVIDER "module" CACHE STRING "Provider of
benchmark library")
- set_property(CACHE gRPC_BENCHMARK_PROVIDER PROPERTY STRINGS "module"
"package")
- else()
- set(gRPC_BENCHMARK_PROVIDER "none")
- endif()
+## Testing
- set(gRPC_ABSL_PROVIDER "module" CACHE STRING "Provider of absl
library")
- set_property(CACHE gRPC_ABSL_PROVIDER PROPERTY STRINGS "module"
"package")
- ```
+### Run a single test case
-3. Example programs uses [gflags](https://github.com/gflags/gflags) to parse
command arguments. Please install it to $HOME/gflags
- as CMakeLists.txt has the following find package statements
+1. List all test cases:
- ```cmake
- # Assume gflags is install in $HOME/gflags
- list(APPEND CMAKE_PREFIX_PATH $ENV{HOME}/gflags)
- find_package(gflags REQUIRED)
+ ```shell
+ ./build/your_test --gtest_list_tests
```
-4. OpenSSL development package is also required.
-
-5. Run the following commands to build from ${YOUR_GIT_REPOSITORY}/cpp
directory
+2. Run a specific test case:
```shell
- mkdir build && cd build
- cmake -DOPENSSL_ROOT_DIR=/usr/local/Cellar/[email protected]/1.1.1q ..
- make -j $(nproc)
+ ./build/your_test --gtest_filter=TestSuite.TestName
```
-6. Static archive and dynamic linked libraries are found in the build
directory.
+### Run tests multiple times
-### Run Examples
+```shell
+bazel test --runs_per_test=10 //...
+```
- All follow-up commands should run from the workspace directory.
+### Test Coverage
-#### Publish messages to broker servers
+Generate coverage data and HTML report:
- Publish standard messages to your topic synchronously
+```shell
+bazel coverage -s \
+ --instrument_test_targets \
+ --experimental_cc_coverage \
+ --combined_report=lcov \
+
--coverage_report_generator=@bazel_tools//tools/test/CoverageOutputGenerator/java/com/google/devtools/coverageoutputgenerator:Main
\
+ //source/...
- ```starlark
- bazel run //examples:example_producer -- --topic=YOUR_TOPIC
--access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16
- ```
-
- where `1024` is size of the message body to publish in bytes
+genhtml bazel-out/_coverage/_coverage_report.dat \
+ --output-directory coverage_html
+```
- ---
+## Run Examples
- Publish standard messages to your topic asynchronously
+All commands should run from the `cpp/` directory.
- ```starlark
- bazel run //examples:example_producer_with_async -- --topic=YOUR_TOPIC
--access_point=SERVICE_ACCESS_POINT --message_body_size=1024 --total=16
- ```
+### Publish messages
- where `1024` is size of the message body to publish in bytes
+```shell
+# Standard messages (sync)
+bazel run //examples:example_producer -- \
+ --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --total=16
- ---
+# Standard messages (async)
+bazel run //examples:example_producer_with_async -- \
+ --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --total=16
- Publish FIFO messages to your topic
+# FIFO messages
+bazel run //examples:example_producer_with_fifo_message -- \
+ --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --total=16
- ```starlark
- bazel run //examples:example_producer_with_fifo_message --
--topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024
--total=16
- ```
-
- where `1024` is size of the message body to publish in bytes
+# Transactional messages
+bazel run //examples:example_producer_with_transactional_message -- \
+ --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --total=16
+```
- ---
+### Consume messages
- Publish transactional messages
+```shell
+# Push consumer (message listener)
+bazel run //examples:example_push_consumer -- \
+ --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --group=YOUR_GROUP_ID
- ```starlark
- bazel run //examples:example_producer_with_transactional_message --
--topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --message_body_size=1024
--total=16
- ```
+# Simple consumer (pull-based)
+bazel run //examples:example_simple_consumer -- \
+ --topic=YOUR_TOPIC --access_point=SERVICE_ACCESS_POINT --group=YOUR_GROUP_ID
+```
- where `1024` is size of the message body to publish in bytes
+## Code Style
-#### Subscribe messages from broker servers
+Based on [Google C++ Code
Style](https://google.github.io/styleguide/cppguide.html), enforced by
`.clang-format` and `.clang-tidy`.
- Consume messages through Message Listener
+- C++11 standard, compatible with [gRPC's compiler
matrix](https://github.com/grpc/grpc/blob/master/BUILDING.md)
+- Exceptions only in public API wrapper classes; internal code uses
`std::error_code`
+- Smart pointers preferred; raw pointers only when necessary
- ```starlark
- bazel run //examples:example_push_consumer -- --topic=YOUR_TOPIC
--access_point=SERVICE_ACCESS_POINT --group=YOUR_GROUP_ID
- ```
+Format code:
- ---
- Consume messages through raw, atomic API
+```shell
+./tools/format.sh
+```
- ```starlark
- bazel run //examples:example_simple_consumer -- --topic=YOUR_TOPIC
--access_point=SERVICE_ACCESS_POINT --group=YOUR_GROUP_ID
- ```
+## Dependency Management
-### IDE
+This SDK is built on top of gRPC. Before introducing a new third-party
dependency, check [gRPC
deps](https://github.com/grpc/grpc/blob/master/bazel/grpc_deps.bzl) and [gRPC
extra deps](https://github.com/grpc/grpc/blob/master/bazel/grpc_extra_deps.bzl)
first.
-[Visual Studio Code](https://code.visualstudio.com/) +
[Clangd](https://clangd.llvm.org/) is the recommended development toolset.
+All Bazel dependencies should use `maybe()` to ensure back-off compatibility.
-1. VSCode + Clangd
+## IDE Setup
- [Clangd](https://clangd.llvm.org/) is a really nice code completion tool.
Clangd requires compile_commands.json to work properly.
- To generate the file, run the following command:
+### VSCode + Clangd
- ```sh
- ./tools/gen_compile_commands.sh
- ```
+Generate `compile_commands.json` for clangd:
- Once the script completes, you should have compile_commands.json file in
the workspace directory, aka, ${repository}/cpp.
-
- LLVM project has an extension for
[clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd).
Please install it from the extension market.
-
- The following configuration entries should be appended to your VSC settings
file.
-
- ```text
- "C_Cpp.intelliSenseEngine": "Disabled",
- "C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from
both extensions.
- "C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles
from both extensions (clangd's seem to be more reliable anyway).
- "clangd.path": "/usr/bin/clangd",
- "clangd.arguments": [
- "-log=verbose",
- "-pretty",
- "--background-index",
- "--header-insertion=never",
- "--compile-commands-dir=${workspaceFolder}/",
- "--query-driver=**"
- ],
- "clangd.onConfigChanged": "restart",
- ```
+```shell
+./tools/gen_compile_commands.sh
+```
-2. CLion + Bazel Plugin
+### CLion + Bazel Plugin
- Bazel also has a plugin for CLion.
+CLion is supported via the [Bazel
plugin](https://plugins.jetbrains.com/plugin/8609-bazel).
[codecov-cpp-image]:
https://img.shields.io/codecov/c/gh/apache/rocketmq-clients/master?flag=cpp&label=CPP%20Coverage&logo=codecov
[codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
diff --git a/cpp/docs/assets/BasicMode.png b/cpp/docs/assets/BasicMode.png
deleted file mode 100644
index f0425851..00000000
Binary files a/cpp/docs/assets/BasicMode.png and /dev/null differ
diff --git a/cpp/docs/assets/class_diagram.png
b/cpp/docs/assets/class_diagram.png
deleted file mode 100644
index b22e3e46..00000000
Binary files a/cpp/docs/assets/class_diagram.png and /dev/null differ
diff --git a/cpp/proto/apache/rocketmq/v2/admin.proto
b/cpp/proto/apache/rocketmq/v2/admin.proto
deleted file mode 100644
index 7dbb7027..00000000
--- a/cpp/proto/apache/rocketmq/v2/admin.proto
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package apache.rocketmq.v2;
-
-option cc_enable_arenas = true;
-option csharp_namespace = "Apache.Rocketmq.V2";
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v2";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQAdmin";
-
-message ChangeLogLevelRequest {
- enum Level {
- TRACE = 0;
- DEBUG = 1;
- INFO = 2;
- WARN = 3;
- ERROR = 4;
- }
- Level level = 1;
-}
-
-message ChangeLogLevelResponse { string remark = 1; }
-
-service Admin {
- rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
-}
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/admin.proto
b/cpp/proto/apache/rocketmq/v2/admin.proto
new file mode 120000
index 00000000..60594b75
--- /dev/null
+++ b/cpp/proto/apache/rocketmq/v2/admin.proto
@@ -0,0 +1 @@
+../../../../../protos/apache/rocketmq/v2/admin.proto
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto
b/cpp/proto/apache/rocketmq/v2/definition.proto
deleted file mode 100644
index c0762b0c..00000000
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ /dev/null
@@ -1,582 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/timestamp.proto";
-import "google/protobuf/duration.proto";
-
-package apache.rocketmq.v2;
-
-option csharp_namespace = "Apache.Rocketmq.V2";
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v2";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQDomain";
-
-enum TransactionResolution {
- TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
- COMMIT = 1;
- ROLLBACK = 2;
-}
-
-enum TransactionSource {
- SOURCE_UNSPECIFIED = 0;
- SOURCE_CLIENT = 1;
- SOURCE_SERVER_CHECK = 2;
-}
-
-enum Permission {
- PERMISSION_UNSPECIFIED = 0;
- NONE = 1;
- READ = 2;
- WRITE = 3;
- READ_WRITE = 4;
-}
-
-enum FilterType {
- FILTER_TYPE_UNSPECIFIED = 0;
- TAG = 1;
- SQL = 2;
-}
-
-message FilterExpression {
- FilterType type = 1;
- string expression = 2;
-}
-
-message RetryPolicy {
- int32 max_attempts = 1;
- oneof strategy {
- ExponentialBackoff exponential_backoff = 2;
- CustomizedBackoff customized_backoff = 3;
- }
-}
-
-// https://en.wikipedia.org/wiki/Exponential_backoff
-message ExponentialBackoff {
- google.protobuf.Duration initial = 1;
- google.protobuf.Duration max = 2;
- float multiplier = 3;
-}
-
-message CustomizedBackoff {
- // To support classic backoff strategy which is arbitrary defined by end
users.
- // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m
1h 2h`
- repeated google.protobuf.Duration next = 1;
-}
-
-message Resource {
- string resource_namespace = 1;
-
- // Resource name identifier, which remains unique within the abstract
resource
- // namespace.
- string name = 2;
-}
-
-message SubscriptionEntry {
- Resource topic = 1;
- FilterExpression expression = 2;
-}
-
-enum AddressScheme {
- ADDRESS_SCHEME_UNSPECIFIED = 0;
- IPv4 = 1;
- IPv6 = 2;
- DOMAIN_NAME = 3;
-}
-
-message Address {
- string host = 1;
- int32 port = 2;
-}
-
-message Endpoints {
- AddressScheme scheme = 1;
- repeated Address addresses = 2;
-}
-
-message Broker {
- // Name of the broker
- string name = 1;
-
- // Broker index. Canonically, index = 0 implies that the broker is playing
- // leader role while brokers with index > 0 play follower role.
- int32 id = 2;
-
- // Address of the broker, complying with the following scheme
- // 1. dns:[//authority/]host[:port]
- // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
- // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
- Endpoints endpoints = 3;
-}
-
-message MessageQueue {
- Resource topic = 1;
- int32 id = 2;
- Permission permission = 3;
- Broker broker = 4;
- repeated MessageType accept_message_types = 5;
-}
-
-enum MessageType {
- MESSAGE_TYPE_UNSPECIFIED = 0;
-
- NORMAL = 1;
-
- // Sequenced message
- FIFO = 2;
-
- // Messages that are delivered after the specified duration.
- DELAY = 3;
-
- // Messages that are transactional. Only committed messages are delivered to
- // subscribers.
- TRANSACTION = 4;
-
- // lite topic
- LITE = 5;
-
- // Messages that lower prioritised ones may need to wait for higher priority
messages to be processed first
- PRIORITY = 6;
-}
-
-enum DigestType {
- DIGEST_TYPE_UNSPECIFIED = 0;
-
- // CRC algorithm achieves goal of detecting random data error with lowest
- // computation overhead.
- CRC32 = 1;
-
- // MD5 algorithm achieves good balance between collision rate and computation
- // overhead.
- MD5 = 2;
-
- // SHA-family has substantially fewer collision with fair amount of
- // computation.
- SHA1 = 3;
-}
-
-// When publishing messages to or subscribing messages from brokers, clients
-// shall include or validate digests of message body to ensure data integrity.
-//
-// For message publishing, when an invalid digest were detected, brokers need
-// respond client with BAD_REQUEST.
-//
-// For messages subscription, when an invalid digest were detected, consumers
-// need to handle this case according to message type:
-// 1) Standard messages should be negatively acknowledged instantly, causing
-// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
-// previously acquired messages batch;
-message Digest {
- DigestType type = 1;
- string checksum = 2;
-}
-
-enum ClientType {
- CLIENT_TYPE_UNSPECIFIED = 0;
- PRODUCER = 1;
- PUSH_CONSUMER = 2;
- SIMPLE_CONSUMER = 3;
- PULL_CONSUMER = 4;
-}
-
-enum Encoding {
- ENCODING_UNSPECIFIED = 0;
-
- IDENTITY = 1;
-
- GZIP = 2;
-}
-
-message SystemProperties {
- // Tag, which is optional.
- optional string tag = 1;
-
- // Message keys
- repeated string keys = 2;
-
- // Message identifier, client-side generated, remains unique.
- // if message_id is empty, the send message request will be aborted with
- // status `INVALID_ARGUMENT`
- string message_id = 3;
-
- // Message body digest
- Digest body_digest = 4;
-
- // Message body encoding. Candidate options are identity, gzip, snappy etc.
- Encoding body_encoding = 5;
-
- // Message type, normal, FIFO or transactional.
- MessageType message_type = 6;
-
- // Message born time-point.
- google.protobuf.Timestamp born_timestamp = 7;
-
- // Message born host. Valid options are IPv4, IPv6 or client host domain
name.
- string born_host = 8;
-
- // Time-point at which the message is stored in the broker, which is absent
- // for message publishing.
- optional google.protobuf.Timestamp store_timestamp = 9;
-
- // The broker that stores this message. It may be broker name, IP or
arbitrary
- // identifier that uniquely identify the server.
- string store_host = 10;
-
- // Time-point at which broker delivers to clients, which is optional.
- optional google.protobuf.Timestamp delivery_timestamp = 11;
-
- // If a message is acquired by way of POP, this field holds the receipt,
- // which is absent for message publishing.
- // Clients use the receipt to acknowledge or negatively acknowledge the
- // message.
- optional string receipt_handle = 12;
-
- // Message queue identifier in which a message is physically stored.
- int32 queue_id = 13;
-
- // Message-queue offset at which a message is stored, which is absent for
- // message publishing.
- optional int64 queue_offset = 14;
-
- // Period of time servers would remain invisible once a message is acquired.
- optional google.protobuf.Duration invisible_duration = 15;
-
- // Business code may failed to process messages for the moment. Hence,
clients
- // may request servers to deliver them again using certain back-off strategy,
- // the attempt is 1 not 0 if message is delivered first time, and it is
absent
- // for message publishing.
- optional int32 delivery_attempt = 16;
-
- // Define the group name of message in the same topic, which is optional.
- optional string message_group = 17;
-
- // Trace context for each message, which is optional.
- optional string trace_context = 18;
-
- // If a transactional message stay unresolved for more than
- // `transaction_orphan_threshold`, it would be regarded as an
- // orphan. Servers that manages orphan messages would pick up
- // a capable publisher to resolve
- optional google.protobuf.Duration orphaned_transaction_recovery_duration =
19;
-
- // Information to identify whether this message is from dead letter queue.
- optional DeadLetterQueue dead_letter_queue = 20;
-
- // lite topic
- optional string lite_topic = 21;
-
- // Priority of message, which is optional
- optional int32 priority = 22;
-}
-
-message DeadLetterQueue {
- // Original topic for this DLQ message.
- string topic = 1;
- // Original message id for this DLQ message.
- string message_id = 2;
-}
-
-message Message {
-
- Resource topic = 1;
-
- // User defined key-value pairs.
- // If user_properties contain the reserved keys by RocketMQ,
- // the send message request will be aborted with status `INVALID_ARGUMENT`.
- // See below links for the reserved keys
- //
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
- map<string, string> user_properties = 2;
-
- SystemProperties system_properties = 3;
-
- bytes body = 4;
-}
-
-message Assignment {
- MessageQueue message_queue = 1;
-}
-
-enum Code {
- CODE_UNSPECIFIED = 0;
-
- // Generic code for success.
- OK = 20000;
-
- // Generic code for multiple return results.
- MULTIPLE_RESULTS = 30000;
-
- // Generic code for bad request, indicating that required fields or headers
are missing.
- BAD_REQUEST = 40000;
- // Format of access point is illegal.
- ILLEGAL_ACCESS_POINT = 40001;
- // Format of topic is illegal.
- ILLEGAL_TOPIC = 40002;
- // Format of consumer group is illegal.
- ILLEGAL_CONSUMER_GROUP = 40003;
- // Format of message tag is illegal.
- ILLEGAL_MESSAGE_TAG = 40004;
- // Format of message key is illegal.
- ILLEGAL_MESSAGE_KEY = 40005;
- // Format of message group is illegal.
- ILLEGAL_MESSAGE_GROUP = 40006;
- // Format of message property key is illegal.
- ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
- // Transaction id is invalid.
- INVALID_TRANSACTION_ID = 40008;
- // Format of message id is illegal.
- ILLEGAL_MESSAGE_ID = 40009;
- // Format of filter expression is illegal.
- ILLEGAL_FILTER_EXPRESSION = 40010;
- // The invisible time of request is invalid.
- ILLEGAL_INVISIBLE_TIME = 40011;
- // The delivery timestamp of message is invalid.
- ILLEGAL_DELIVERY_TIME = 40012;
- // Receipt handle of message is invalid.
- INVALID_RECEIPT_HANDLE = 40013;
- // Message property conflicts with its type.
- MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
- // Client type could not be recognized.
- UNRECOGNIZED_CLIENT_TYPE = 40015;
- // Message is corrupted.
- MESSAGE_CORRUPTED = 40016;
- // Request is rejected due to missing of x-mq-client-id header.
- CLIENT_ID_REQUIRED = 40017;
- // Polling time is illegal.
- ILLEGAL_POLLING_TIME = 40018;
- // Offset is illegal.
- ILLEGAL_OFFSET = 40019;
-
- // Generic code indicates that the client request lacks valid authentication
- // credentials for the requested resource.
- UNAUTHORIZED = 40100;
-
- // Generic code indicates that the account is suspended due to overdue of
payment.
- PAYMENT_REQUIRED = 40200;
-
- // Generic code for the case that user does not have the permission to
operate.
- FORBIDDEN = 40300;
-
- // Generic code for resource not found.
- NOT_FOUND = 40400;
- // Message not found from server.
- MESSAGE_NOT_FOUND = 40401;
- // Topic resource does not exist.
- TOPIC_NOT_FOUND = 40402;
- // Consumer group resource does not exist.
- CONSUMER_GROUP_NOT_FOUND = 40403;
- // Offset not found from server.
- OFFSET_NOT_FOUND = 40404;
-
- // Generic code representing client side timeout when connecting to, reading
data from, or write data to server.
- REQUEST_TIMEOUT = 40800;
-
- // Generic code represents that the request entity is larger than limits
defined by server.
- PAYLOAD_TOO_LARGE = 41300;
- // Message body size exceeds the threshold.
- MESSAGE_BODY_TOO_LARGE = 41301;
- // Message body is empty.
- MESSAGE_BODY_EMPTY = 41302;
-
- // Generic code for use cases where pre-conditions are not met.
- // For example, if a producer instance is used to publish messages without
prior start() invocation,
- // this error code will be raised.
- PRECONDITION_FAILED = 42800;
-
- // Generic code indicates that too many requests are made in short period of
duration.
- // Requests are throttled.
- TOO_MANY_REQUESTS = 42900;
-
- // Generic code for the case that the server is unwilling to process the
request because its header fields are too large.
- // The request may be resubmitted after reducing the size of the request
header fields.
- REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
- // Message properties total size exceeds the threshold.
- MESSAGE_PROPERTIES_TOO_LARGE = 43101;
-
- // Generic code indicates that server/client encountered an unexpected
- // condition that prevented it from fulfilling the request.
- INTERNAL_ERROR = 50000;
- // Code indicates that the server encountered an unexpected condition
- // that prevented it from fulfilling the request.
- // This error response is a generic "catch-all" response.
- // Usually, this indicates the server cannot find a better alternative
- // error code to response. Sometimes, server administrators log error
- // responses like the 500 status code with more details about the request
- // to prevent the error from happening again in the future.
- //
- // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
- INTERNAL_SERVER_ERROR = 50001;
- // The HA-mechanism is not working now.
- HA_NOT_AVAILABLE = 50002;
-
- // Generic code means that the server or client does not support the
- // functionality required to fulfill the request.
- NOT_IMPLEMENTED = 50100;
-
- // Generic code represents that the server, which acts as a gateway or proxy,
- // does not get an satisfied response in time from its upstream servers.
- // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
- PROXY_TIMEOUT = 50400;
- // Message persistence timeout.
- MASTER_PERSISTENCE_TIMEOUT = 50401;
- // Slave persistence timeout.
- SLAVE_PERSISTENCE_TIMEOUT = 50402;
-
- // Generic code for unsupported operation.
- UNSUPPORTED = 50500;
- // Operation is not allowed in current version.
- VERSION_UNSUPPORTED = 50501;
- // Not allowed to verify message. Chances are that you are verifying
- // a FIFO message, as is violating FIFO semantics.
- VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
-
- // Generic code for failed message consumption.
- FAILED_TO_CONSUME_MESSAGE = 60000;
-}
-
-message Status {
- Code code = 1;
- string message = 2;
-}
-
-enum Language {
- LANGUAGE_UNSPECIFIED = 0;
- JAVA = 1;
- CPP = 2;
- DOT_NET = 3;
- GOLANG = 4;
- RUST = 5;
- PYTHON = 6;
- PHP = 7;
- NODE_JS = 8;
- RUBY = 9;
- OBJECTIVE_C = 10;
- DART = 11;
- KOTLIN = 12;
-}
-
-// User Agent
-message UA {
- // SDK language
- Language language = 1;
-
- // SDK version
- string version = 2;
-
- // Platform details, including OS name, version, arch etc.
- string platform = 3;
-
- // Hostname of the node
- string hostname = 4;
-}
-
-message Settings {
- // Configurations for all clients.
- optional ClientType client_type = 1;
-
- optional Endpoints access_point = 2;
-
- // If publishing of messages encounters throttling or server internal errors,
- // publishers should implement automatic retries after progressive longer
- // back-offs for consecutive errors.
- //
- // When processing message fails, `backoff_policy` describes an interval
- // after which the message should be available to consume again.
- //
- // For FIFO messages, the interval should be relatively small because
- // messages of the same message group would not be readily available until
- // the prior one depletes its lifecycle.
- optional RetryPolicy backoff_policy = 3;
-
- // Request timeout for RPCs excluding long-polling.
- optional google.protobuf.Duration request_timeout = 4;
-
- oneof pub_sub {
- Publishing publishing = 5;
-
- Subscription subscription = 6;
- }
-
- // User agent details
- UA user_agent = 7;
-
- Metric metric = 8;
-}
-
-message Publishing {
- // Publishing settings below here is appointed by client, thus it is
- // unnecessary for server to push at present.
- //
- // List of topics to which messages will publish to.
- repeated Resource topics = 1;
-
- // If the message body size exceeds `max_body_size`, broker servers would
- // reject the request. As a result, it is advisable that Producer performs
- // client-side check validation.
- int32 max_body_size = 2;
-
- // When `validate_message_type` flag set `false`, no need to validate
message's type
- // with messageQueue's `accept_message_types` before publishing.
- bool validate_message_type = 3;
-}
-
-message Subscription {
- // Subscription settings below here is appointed by client, thus it is
- // unnecessary for server to push at present.
- //
- // Consumer group.
- optional Resource group = 1;
-
- // Subscription for consumer.
- repeated SubscriptionEntry subscriptions = 2;
-
- // Subscription settings below here are from server, it is essential for
- // server to push.
- //
- // When FIFO flag is `true`, messages of the same message group are processed
- // in first-in-first-out manner.
- //
- // Brokers will not deliver further messages of the same group until prior
- // ones are completely acknowledged.
- optional bool fifo = 3;
-
- // Message receive batch size here is essential for push consumer.
- optional int32 receive_batch_size = 4;
-
- // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
- // push consumer.
- optional google.protobuf.Duration long_polling_timeout = 5;
-}
-
-message Metric {
- // Indicates that if client should export local metrics to server.
- bool on = 1;
-
- // The endpoint that client metrics should be exported to, which is required
if the switch is on.
- optional Endpoints endpoints = 2;
-}
-
-enum QueryOffsetPolicy {
- // Use this option if client wishes to playback all existing messages.
- BEGINNING = 0;
-
- // Use this option if client wishes to skip all existing messages.
- END = 1;
-
- // Use this option if time-based seek is targeted.
- TIMESTAMP = 2;
-}
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto
b/cpp/proto/apache/rocketmq/v2/definition.proto
new file mode 120000
index 00000000..a85a96e2
--- /dev/null
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1 @@
+../../../../../protos/apache/rocketmq/v2/definition.proto
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto
b/cpp/proto/apache/rocketmq/v2/service.proto
deleted file mode 100644
index 1a3dbbe9..00000000
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ /dev/null
@@ -1,438 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/duration.proto";
-import "google/protobuf/timestamp.proto";
-
-import "apache/rocketmq/v2/definition.proto";
-
-package apache.rocketmq.v2;
-
-option csharp_namespace = "Apache.Rocketmq.V2";
-option java_multiple_files = true;
-option java_package = "apache.rocketmq.v2";
-option java_generate_equals_and_hash = true;
-option java_string_check_utf8 = true;
-option java_outer_classname = "MQService";
-
-// Topics are destination of messages to publish to or subscribe from. Similar
-// to domain names, they will be addressable after resolution through the
-// provided access point.
-//
-// Access points are usually the addresses of name servers, which fulfill
-// service discovery, load-balancing and other auxiliary services. Name servers
-// receive periodic heartbeats from affiliate brokers and erase those which
-// failed to maintain alive status.
-//
-// Name servers answer queries of QueryRouteRequest, responding clients with
-// addressable message-queues, which they may directly publish messages to or
-// subscribe messages from.
-//
-// QueryRouteRequest shall include source endpoints, aka, configured
-// access-point, which annotates tenant-id, instance-id or other
-// vendor-specific settings. Purpose-built name servers may respond customized
-// results based on these particular requirements.
-message QueryRouteRequest {
- Resource topic = 1;
- Endpoints endpoints = 2;
-}
-
-message QueryRouteResponse {
- Status status = 1;
-
- repeated MessageQueue message_queues = 2;
-}
-
-message SendMessageRequest {
- repeated Message messages = 1;
-}
-
-message SendResultEntry {
- Status status = 1;
- string message_id = 2;
- string transaction_id = 3;
- int64 offset = 4;
- // Unique handle to identify message to recall, support delay message for
now.
- string recall_handle = 5;
-}
-
-message SendMessageResponse {
- Status status = 1;
-
- // Some implementation may have partial failure issues. Client SDK
developers are expected to inspect
- // each entry for best certainty.
- repeated SendResultEntry entries = 2;
-}
-
-message QueryAssignmentRequest {
- Resource topic = 1;
- Resource group = 2;
- Endpoints endpoints = 3;
-}
-
-message QueryAssignmentResponse {
- Status status = 1;
- repeated Assignment assignments = 2;
-}
-
-message ReceiveMessageRequest {
- Resource group = 1;
- MessageQueue message_queue = 2;
- FilterExpression filter_expression = 3;
- int32 batch_size = 4;
- // Required if client type is simple consumer.
- optional google.protobuf.Duration invisible_duration = 5;
- // For message auto renew and clean
- bool auto_renew = 6;
- optional google.protobuf.Duration long_polling_timeout = 7;
- optional string attempt_id = 8;
-}
-
-message ReceiveMessageResponse {
- oneof content {
- Status status = 1;
- Message message = 2;
- // The timestamp that brokers start to deliver status line or message.
- google.protobuf.Timestamp delivery_timestamp = 3;
- }
-}
-
-message AckMessageEntry {
- string message_id = 1;
- string receipt_handle = 2;
-}
-
-message AckMessageRequest {
- Resource group = 1;
- Resource topic = 2;
- repeated AckMessageEntry entries = 3;
-}
-
-message AckMessageResultEntry {
- string message_id = 1;
- string receipt_handle = 2;
-
- // Acknowledge result may be acquired through inspecting
- // `status.code`; In case acknowledgement failed, `status.message`
- // is the explanation of the failure.
- Status status = 3;
-}
-
-message AckMessageResponse {
-
- // RPC tier status, which is used to represent RPC-level errors including
- // authentication, authorization, throttling and other general failures.
- Status status = 1;
-
- repeated AckMessageResultEntry entries = 2;
-}
-
-message ForwardMessageToDeadLetterQueueRequest {
- Resource group = 1;
- Resource topic = 2;
- string receipt_handle = 3;
- string message_id = 4;
- int32 delivery_attempt = 5;
- int32 max_delivery_attempts = 6;
-}
-
-message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
-
-message HeartbeatRequest {
- optional Resource group = 1;
- ClientType client_type = 2;
-}
-
-message HeartbeatResponse { Status status = 1; }
-
-message EndTransactionRequest {
- Resource topic = 1;
- string message_id = 2;
- string transaction_id = 3;
- TransactionResolution resolution = 4;
- TransactionSource source = 5;
- string trace_context = 6;
-}
-
-message EndTransactionResponse { Status status = 1; }
-
-message PrintThreadStackTraceCommand { string nonce = 1; }
-
-message ThreadStackTrace {
- string nonce = 1;
- optional string thread_stack_trace = 2;
-}
-
-message VerifyMessageCommand {
- string nonce = 1;
- Message message = 2;
-}
-
-message VerifyMessageResult {
- string nonce = 1;
-}
-
-message RecoverOrphanedTransactionCommand {
- Message message = 1;
- string transaction_id = 2;
-}
-
-message TelemetryCommand {
- optional Status status = 1;
-
- oneof command {
- // Client settings
- Settings settings = 2;
-
- // These messages are from client.
- //
- // Report thread stack trace to server.
- ThreadStackTrace thread_stack_trace = 3;
-
- // Report message verify result to server.
- VerifyMessageResult verify_message_result = 4;
-
- // There messages are from server.
- //
- // Request client to recover the orphaned transaction message.
- RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
-
- // Request client to print thread stack trace.
- PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
-
- // Request client to verify the consumption of the appointed message.
- VerifyMessageCommand verify_message_command = 7;
- }
-}
-
-message NotifyClientTerminationRequest {
- // Consumer group, which is absent for producer.
- optional Resource group = 1;
-}
-
-message NotifyClientTerminationResponse { Status status = 1; }
-
-message ChangeInvisibleDurationRequest {
- Resource group = 1;
- Resource topic = 2;
-
- // Unique receipt handle to identify message to change
- string receipt_handle = 3;
-
- // New invisible duration
- google.protobuf.Duration invisible_duration = 4;
-
- // For message tracing
- string message_id = 5;
-}
-
-message ChangeInvisibleDurationResponse {
- Status status = 1;
-
- // Server may generate a new receipt handle for the message.
- string receipt_handle = 2;
-}
-
-message PullMessageRequest {
- Resource group = 1;
- MessageQueue message_queue = 2;
- int64 offset = 3;
- int32 batch_size = 4;
- FilterExpression filter_expression = 5;
- google.protobuf.Duration long_polling_timeout = 6;
-}
-
-message PullMessageResponse {
- oneof content {
- Status status = 1;
- Message message = 2;
- int64 next_offset = 3;
- }
-}
-
-message UpdateOffsetRequest {
- Resource group = 1;
- MessageQueue message_queue = 2;
- int64 offset = 3;
-}
-
-message UpdateOffsetResponse {
- Status status = 1;
-}
-
-message GetOffsetRequest {
- Resource group = 1;
- MessageQueue message_queue = 2;
-}
-
-message GetOffsetResponse {
- Status status = 1;
- int64 offset = 2;
-}
-
-message QueryOffsetRequest {
- MessageQueue message_queue = 1;
- QueryOffsetPolicy query_offset_policy = 2;
- optional google.protobuf.Timestamp timestamp = 3;
-}
-
-message QueryOffsetResponse {
- Status status = 1;
- int64 offset = 2;
-}
-
-message RecallMessageRequest {
- Resource topic = 1;
- // Refer to SendResultEntry.
- string recall_handle = 2;
-}
-
-message RecallMessageResponse {
- Status status = 1;
- string message_id = 2;
-}
-
-// For all the RPCs in MessagingService, the following error handling policies
-// apply:
-//
-// If the request doesn't bear a valid authentication credential, return a
-// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
-// user is not granted with sufficient permission to execute the requested
-// operation, return a response with common.status.code == `PERMISSION_DENIED`.
-// If the per-user-resource-based quota is exhausted, return a response with
-// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
-// errors raise, return a response with common.status.code == `INTERNAL`.
-service MessagingService {
-
- // Queries the route entries of the requested topic in the perspective of the
- // given endpoints. On success, servers should return a collection of
- // addressable message-queues. Note servers may return customized route
- // entries based on endpoints provided.
- //
- // If the requested topic doesn't exist, returns `NOT_FOUND`.
- // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
- rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
-
- // Producer or consumer sends HeartbeatRequest to servers periodically to
- // keep-alive. Additionally, it also reports client-side configuration,
- // including topic subscription, load-balancing group name, etc.
- //
- // Returns `OK` if success.
- //
- // If a client specifies a language that is not yet supported by servers,
- // returns `INVALID_ARGUMENT`
- rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
-
- // Delivers messages to brokers.
- // Clients may further:
- // 1. Refine a message destination to message-queues which fulfills parts of
- // FIFO semantic;
- // 2. Flag a message as transactional, which keeps it invisible to consumers
- // until it commits;
- // 3. Time a message, making it invisible to consumers till specified
- // time-point;
- // 4. And more...
- //
- // Returns message-id or transaction-id with status `OK` on success.
- //
- // If the destination topic doesn't exist, returns `NOT_FOUND`.
- rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
-
- // Queries the assigned route info of a topic for current consumer,
- // the returned assignment result is decided by server-side load balancer.
- //
- // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
- // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
- rpc QueryAssignment(QueryAssignmentRequest) returns
(QueryAssignmentResponse) {
- }
-
- // Receives messages from the server in batch manner, returns a set of
- // messages if success. The received messages should be acked or redelivered
- // after processed.
- //
- // If the pending concurrent receive requests exceed the quota of the given
- // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
- // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
- // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
- // message in the specific topic, returns `OK` with an empty message set.
- // Please note that client may suffer from false empty responses.
- //
- // If failed to receive message from remote, server must return only one
- // `ReceiveMessageResponse` as the reply to the request, whose `Status`
indicates
- // the specific reason of failure, otherwise, the reply is considered
successful.
- rpc ReceiveMessage(ReceiveMessageRequest) returns (stream
ReceiveMessageResponse) {
- }
-
- // Acknowledges the message associated with the `receipt_handle` or `offset`
- // in the `AckMessageRequest`, it means the message has been successfully
- // processed. Returns `OK` if the message server remove the relevant message
- // successfully.
- //
- // If the given receipt_handle is illegal or out of date, returns
- // `INVALID_ARGUMENT`.
- rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
-
- // Forwards one message to dead letter queue if the max delivery attempts is
- // exceeded by this message at client-side, return `OK` if success.
- rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
- returns (ForwardMessageToDeadLetterQueueResponse) {}
-
- // PullMessage and ReceiveMessage RPCs serve a similar purpose,
- // which is to attempt to get messages from the server, but with different
semantics.
- rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
-
- // Update the consumption progress of the designated queue of the
- // consumer group to the remote.
- rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
-
- // Query the consumption progress of the designated queue of the
- // consumer group to the remote.
- rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
-
- // Query the offset of the designated queue by the query offset policy.
- rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
-
- // Commits or rollback one transactional message.
- rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
-
- // Once a client starts, it would immediately establishes bi-lateral stream
- // RPCs with brokers, reporting its settings as the initiative command.
- //
- // When servers have need of inspecting client status, they would issue
- // telemetry commands to clients. After executing received instructions,
- // clients shall report command execution results through client-side
streams.
- rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
-
- // Notify the server that the client is terminated.
- rpc NotifyClientTermination(NotifyClientTerminationRequest) returns
(NotifyClientTerminationResponse) {
- }
-
- // Once a message is retrieved from consume queue on behalf of the group, it
- // will be kept invisible to other clients of the same group for a period of
- // time. The message is supposed to be processed within the invisible
- // duration. If the client, which is in charge of the invisible message, is
- // not capable of processing the message timely, it may use
- // ChangeInvisibleDuration to lengthen invisible duration.
- rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns
(ChangeInvisibleDurationResponse) {
- }
-
- // Recall a message,
- // for delay message, should recall before delivery time, like the rollback
operation of transaction message,
- // for normal message, not supported for now.
- rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
- }
-}
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto
b/cpp/proto/apache/rocketmq/v2/service.proto
new file mode 120000
index 00000000..5d98f3f9
--- /dev/null
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -0,0 +1 @@
+../../../../../protos/apache/rocketmq/v2/service.proto
\ No newline at end of file
diff --git a/cpp/source/CMakeLists.txt b/cpp/source/CMakeLists.txt
index dbf5ab01..d860f8e4 100644
--- a/cpp/source/CMakeLists.txt
+++ b/cpp/source/CMakeLists.txt
@@ -53,10 +53,14 @@ target_link_libraries(rocketmq_shared
opencensus::stats
opencensus_proto
spdlog)
-set(VERSION_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/exports.map)
-set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS}
-Wl,--version-script=${VERSION_SCRIPT}")
+if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
+ set(VERSION_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/exports.map)
+ set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS}
-Wl,--version-script=${VERSION_SCRIPT}")
+ set_target_properties(rocketmq_shared
+ PROPERTIES
+ LINK_DEPENDS ${VERSION_SCRIPT})
+endif()
set_target_properties(rocketmq_shared
PROPERTIES
- LINK_DEPENDS ${VERSION_SCRIPT}
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
LIBRARY_OUTPUT_NAME rocketmq)
\ No newline at end of file
diff --git a/cpp/source/admin/tests/CMakeLists.txt
b/cpp/source/admin/tests/CMakeLists.txt
new file mode 100644
index 00000000..20829999
--- /dev/null
+++ b/cpp/source/admin/tests/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_executable(admin_server_test AdminServerTest.cpp)
+target_include_directories(admin_server_test PRIVATE
${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+target_link_libraries(admin_server_test PRIVATE ${ROCKETMQ_TEST_LINK_LIBS})
+add_test(NAME admin_server_test COMMAND admin_server_test)
diff --git a/cpp/source/base/include/FmtEnumFormatter.h
b/cpp/source/base/include/FmtEnumFormatter.h
new file mode 100644
index 00000000..8901391f
--- /dev/null
+++ b/cpp/source/base/include/FmtEnumFormatter.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "fmt/format.h"
+#include "rocketmq/State.h"
+#include "apache/rocketmq/v2/definition.pb.h"
+#include "grpcpp/support/status.h"
+
+template <>
+struct fmt::formatter<ROCKETMQ_NAMESPACE::State> : formatter<string_view> {
+ template <typename FormatContext>
+ auto format(ROCKETMQ_NAMESPACE::State s, FormatContext& ctx) ->
decltype(ctx.out()) {
+ string_view name;
+ switch (s) {
+ case ROCKETMQ_NAMESPACE::State::CREATED: name = "CREATED"; break;
+ case ROCKETMQ_NAMESPACE::State::STARTING: name = "STARTING"; break;
+ case ROCKETMQ_NAMESPACE::State::STARTED: name = "STARTED"; break;
+ case ROCKETMQ_NAMESPACE::State::STOPPING: name = "STOPPING"; break;
+ case ROCKETMQ_NAMESPACE::State::STOPPED: name = "STOPPED"; break;
+ default: name = "UNKNOWN"; break;
+ }
+ return formatter<string_view>::format(name, ctx);
+ }
+};
+
+template <>
+struct fmt::formatter<grpc::StatusCode> : formatter<string_view> {
+ template <typename FormatContext>
+ auto format(grpc::StatusCode code, FormatContext& ctx) ->
decltype(ctx.out()) {
+ string_view name;
+ switch (code) {
+ case grpc::StatusCode::OK: name = "OK"; break;
+ case grpc::StatusCode::CANCELLED: name = "CANCELLED"; break;
+ case grpc::StatusCode::UNKNOWN: name = "UNKNOWN"; break;
+ case grpc::StatusCode::INVALID_ARGUMENT: name = "INVALID_ARGUMENT";
break;
+ case grpc::StatusCode::DEADLINE_EXCEEDED: name = "DEADLINE_EXCEEDED";
break;
+ case grpc::StatusCode::NOT_FOUND: name = "NOT_FOUND"; break;
+ case grpc::StatusCode::ALREADY_EXISTS: name = "ALREADY_EXISTS";
break;
+ case grpc::StatusCode::PERMISSION_DENIED: name = "PERMISSION_DENIED";
break;
+ case grpc::StatusCode::UNAUTHENTICATED: name = "UNAUTHENTICATED";
break;
+ case grpc::StatusCode::RESOURCE_EXHAUSTED: name = "RESOURCE_EXHAUSTED";
break;
+ case grpc::StatusCode::FAILED_PRECONDITION: name =
"FAILED_PRECONDITION"; break;
+ case grpc::StatusCode::ABORTED: name = "ABORTED"; break;
+ case grpc::StatusCode::OUT_OF_RANGE: name = "OUT_OF_RANGE"; break;
+ case grpc::StatusCode::UNIMPLEMENTED: name = "UNIMPLEMENTED";
break;
+ case grpc::StatusCode::INTERNAL: name = "INTERNAL"; break;
+ case grpc::StatusCode::UNAVAILABLE: name = "UNAVAILABLE"; break;
+ case grpc::StatusCode::DATA_LOSS: name = "DATA_LOSS"; break;
+ default: name = "UNKNOWN"; break;
+ }
+ return formatter<string_view>::format(name, ctx);
+ }
+};
+
+template <>
+struct fmt::formatter<apache::rocketmq::v2::Code> : formatter<int> {
+ template <typename FormatContext>
+ auto format(apache::rocketmq::v2::Code code, FormatContext& ctx) ->
decltype(ctx.out()) {
+ return formatter<int>::format(static_cast<int>(code), ctx);
+ }
+};
diff --git a/cpp/source/base/tests/CMakeLists.txt
b/cpp/source/base/tests/CMakeLists.txt
new file mode 100644
index 00000000..ba5ea068
--- /dev/null
+++ b/cpp/source/base/tests/CMakeLists.txt
@@ -0,0 +1,16 @@
+macro(add_base_test TEST_NAME TEST_SRC)
+ add_executable(${TEST_NAME} ${TEST_SRC})
+ target_include_directories(${TEST_NAME} PRIVATE
${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+ target_link_libraries(${TEST_NAME} PRIVATE ${ROCKETMQ_TEST_LINK_LIBS})
+ add_test(NAME ${TEST_NAME} COMMAND ${TEST_NAME})
+endmacro()
+
+add_base_test(assignment_test AssignmentTest.cpp)
+add_base_test(configuration_test ConfigurationTest.cpp)
+add_base_test(invocation_context_test InvocationContextTest.cpp)
+add_base_test(message_builder_test MessageBuilderTest.cpp)
+add_base_test(message_queue_test MessageQueueTest.cpp)
+add_base_test(mix_all_test MixAllTest.cpp)
+add_base_test(retry_policy_test RetryPolicyTest.cpp)
+add_base_test(thread_pool_test ThreadPoolTest.cpp)
+add_base_test(unique_id_generator_test UniqueIdGeneratorTest.cpp)
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 9f6d268e..0441c8f2 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "ClientManagerImpl.h"
+#include "FmtEnumFormatter.h"
#include <atomic>
#include <cassert>
diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp
b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 03831197..999d49d9 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -16,6 +16,7 @@
*/
#include "ReceiveMessageStreamReader.h"
+#include "FmtEnumFormatter.h"
#include <chrono>
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index 2e12f437..7cd0c218 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "TelemetryBidiReactor.h"
+#include "FmtEnumFormatter.h"
#include <memory>
#include <utility>
diff --git a/cpp/source/client/tests/CMakeLists.txt
b/cpp/source/client/tests/CMakeLists.txt
new file mode 100644
index 00000000..4197c5b8
--- /dev/null
+++ b/cpp/source/client/tests/CMakeLists.txt
@@ -0,0 +1,19 @@
+set(CLIENT_MOCK_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}/../mocks/include
+ ${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+
+macro(add_client_test TEST_NAME TEST_SRC)
+ add_executable(${TEST_NAME} ${TEST_SRC})
+ target_include_directories(${TEST_NAME} PRIVATE ${CLIENT_MOCK_DIRS})
+ target_link_libraries(${TEST_NAME} PRIVATE ${ROCKETMQ_TEST_LINK_LIBS}
OpenSSL::SSL)
+ add_test(NAME ${TEST_NAME} COMMAND ${TEST_NAME})
+endmacro()
+
+add_client_test(tls_helper_test TlsHelperTest.cpp)
+# RpcClientTest requires google/rpc/code.pb.h (googleapis, only available via
Bazel)
+# add_client_test(rpc_client_test RpcClientTest.cpp)
+add_client_test(client_test ClientTest.cpp)
+add_client_test(client_manager_test ClientManagerTest.cpp)
+add_client_test(topic_assignment_info_test TopicAssignmentInfoTest.cpp)
+# TracingUtilityTest requires OpenTelemetry headers (only available via Bazel)
+# add_client_test(tracing_utility_test TracingUtilityTest.cpp)
diff --git a/cpp/source/concurrent/tests/CMakeLists.txt
b/cpp/source/concurrent/tests/CMakeLists.txt
new file mode 100644
index 00000000..771a5111
--- /dev/null
+++ b/cpp/source/concurrent/tests/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_executable(countdown_latch_test CountdownLatchTest.cpp)
+target_include_directories(countdown_latch_test PRIVATE
${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+target_link_libraries(countdown_latch_test PRIVATE ${ROCKETMQ_TEST_LINK_LIBS}
absl::memory)
+add_test(NAME countdown_latch_test COMMAND countdown_latch_test)
diff --git a/cpp/source/concurrent/tests/CountdownLatchTest.cpp
b/cpp/source/concurrent/tests/CountdownLatchTest.cpp
index e0d0c4f1..a3f1ad70 100644
--- a/cpp/source/concurrent/tests/CountdownLatchTest.cpp
+++ b/cpp/source/concurrent/tests/CountdownLatchTest.cpp
@@ -19,6 +19,7 @@
#include <mutex>
#include <thread>
+#include "absl/memory/memory.h"
#include "CountdownLatch.h"
#include "gtest/gtest.h"
#include "rocketmq/RocketMQ.h"
diff --git a/cpp/source/rocketmq/ClientImpl.cpp
b/cpp/source/rocketmq/ClientImpl.cpp
index 19b4e460..169df412 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "ClientImpl.h"
+#include "FmtEnumFormatter.h"
#include <algorithm>
#include <atomic>
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 802c151c..184f542f 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "ProducerImpl.h"
+#include "FmtEnumFormatter.h"
#include <algorithm>
#include <cassert>
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 45f40541..90632035 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "PushConsumerImpl.h"
+#include "FmtEnumFormatter.h"
#include <cassert>
#include <chrono>
diff --git a/cpp/source/rocketmq/tests/CMakeLists.txt
b/cpp/source/rocketmq/tests/CMakeLists.txt
new file mode 100644
index 00000000..f8c80eea
--- /dev/null
+++ b/cpp/source/rocketmq/tests/CMakeLists.txt
@@ -0,0 +1,19 @@
+set(ROCKETMQ_TEST_INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}/../mocks/include
+ ${CMAKE_CURRENT_SOURCE_DIR}/../../client/mocks/include
+ ${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+
+macro(add_rocketmq_test TEST_NAME TEST_SRC)
+ add_executable(${TEST_NAME} ${TEST_SRC})
+ target_include_directories(${TEST_NAME} PRIVATE
${ROCKETMQ_TEST_INCLUDE_DIRS})
+ target_link_libraries(${TEST_NAME} PRIVATE ${ROCKETMQ_TEST_LINK_LIBS})
+ add_test(NAME ${TEST_NAME} COMMAND ${TEST_NAME})
+endmacro()
+
+add_rocketmq_test(client_impl_test ClientImplTest.cpp)
+add_rocketmq_test(consume_message_service_test ConsumeMessageServiceTest.cpp)
+add_rocketmq_test(optional_test OptionalTest.cpp)
+add_rocketmq_test(priority_message_test PriorityMessageTest.cpp)
+add_rocketmq_test(send_context_test SendContextTest.cpp)
+add_rocketmq_test(static_name_server_resolver_test
StaticNameServerResolverTest.cpp)
+add_rocketmq_test(time_test TimeTest.cpp)
diff --git a/cpp/source/scheduler/tests/CMakeLists.txt
b/cpp/source/scheduler/tests/CMakeLists.txt
new file mode 100644
index 00000000..6bbb7b82
--- /dev/null
+++ b/cpp/source/scheduler/tests/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_executable(scheduler_test SchedulerTest.cpp)
+target_include_directories(scheduler_test PRIVATE
${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+target_link_libraries(scheduler_test PRIVATE ${ROCKETMQ_TEST_LINK_LIBS})
+add_test(NAME scheduler_test COMMAND scheduler_test)
diff --git a/cpp/source/stats/MetricBidiReactor.cpp
b/cpp/source/stats/MetricBidiReactor.cpp
index e03e7c61..e899e1dd 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "MetricBidiReactor.h"
+#include "FmtEnumFormatter.h"
#include <chrono>
diff --git a/cpp/source/stats/tests/CMakeLists.txt
b/cpp/source/stats/tests/CMakeLists.txt
new file mode 100644
index 00000000..2bb42d80
--- /dev/null
+++ b/cpp/source/stats/tests/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_executable(publish_stats_test PublishStatsTest.cpp)
+target_include_directories(publish_stats_test PRIVATE
${ROCKETMQ_INTERNAL_INCLUDE_DIRS})
+target_link_libraries(publish_stats_test PRIVATE ${ROCKETMQ_TEST_LINK_LIBS})
+add_test(NAME publish_stats_test COMMAND publish_stats_test)