This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-rust.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ce8924 Use protocol v2
0ce8924 is described below
commit 0ce8924fa2fcb0f15c424d400e83f9a4dc46a5d4
Author: Li Zhanhui <[email protected]>
AuthorDate: Sat Apr 2 08:07:03 2022 +0000
Use protocol v2
---
Cargo.lock | 394 ++++++++++++++++++--
Cargo.toml | 20 +-
build.rs | 4 +-
proto/apache/rocketmq/v1/definition.proto | 343 ------------------
proto/apache/rocketmq/{v1 => v2}/admin.proto | 10 +-
proto/apache/rocketmq/v2/definition.proto | 475 +++++++++++++++++++++++++
proto/apache/rocketmq/{v1 => v2}/service.proto | 363 ++++++-------------
proto/google/rpc/error_details.proto | 247 -------------
proto/google/rpc/status.proto | 47 ---
src/bin/server.rs | 15 +
src/lib.rs | 29 +-
src/pb.rs | 1 +
src/rocketmq.rs | 90 +++++
src/service.rs | 156 ++++++++
14 files changed, 1235 insertions(+), 959 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index a35947b..ff88454 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3,13 +3,10 @@
version = 3
[[package]]
-name = "aho-corasick"
-version = "0.7.18"
+name = "adler"
+version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
-dependencies = [
- "memchr",
-]
+checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "anyhow"
@@ -56,6 +53,49 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
+name = "axum"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5611d4977882c5af1c0f7a34d51b5d87f784f86912bb543986b014ea4995ef93"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "bitflags",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite",
+ "serde",
+ "sync_wrapper",
+ "tokio",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95cd109b3e93c9541dcce5b0219dcf89169dcc58c1bebed65082808324258afb"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+]
+
+[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -68,18 +108,48 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
+name = "bumpalo"
+version = "3.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899"
+
+[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
+name = "cc"
+version = "1.0.73"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
+
+[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
+name = "cmake"
+version = "0.1.48"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a"
+dependencies = [
+ "cc",
+]
+
+[[package]]
+name = "crc32fast"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -101,18 +171,45 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
+name = "flate2"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
+dependencies = [
+ "cfg-if",
+ "crc32fast",
+ "libc",
+ "miniz_oxide",
+]
+
+[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
@@ -122,6 +219,12 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -140,6 +243,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
"futures-core",
+ "futures-sink",
"futures-task",
"pin-project-lite",
"pin-utils",
@@ -183,12 +287,9 @@ checksum =
"ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
-version = "0.3.3"
+version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
-dependencies = [
- "unicode-segmentation",
-]
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
@@ -222,6 +323,12 @@ dependencies = [
]
[[package]]
+name = "http-range-header"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
+
+[[package]]
name = "httparse"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -304,6 +411,15 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
+name = "js-sys"
+version = "0.3.56"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04"
+dependencies = [
+ "wasm-bindgen",
+]
+
+[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -334,12 +450,34 @@ dependencies = [
]
[[package]]
+name = "matchit"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
+
+[[package]]
name = "memchr"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
+name = "mime"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
+
+[[package]]
+name = "miniz_oxide"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
+dependencies = [
+ "adler",
+ "autocfg",
+]
+
+[[package]]
name = "mio"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -470,6 +608,16 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
+name = "prettyplease"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b83ec2d0af5c5c556257ff52c9f98934e243b9fd39604bfb2a9b75ec2e97f18"
+dependencies = [
+ "proc-macro2",
+ "syn",
+]
+
+[[package]]
name = "proc-macro2"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -480,9 +628,9 @@ dependencies = [
[[package]]
name = "prost"
-version = "0.9.0"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
+checksum = "1bd5316aa8f5c82add416dfbc25116b84b748a21153f512917e8143640a71bbd"
dependencies = [
"bytes",
"prost-derive",
@@ -490,11 +638,13 @@ dependencies = [
[[package]]
name = "prost-build"
-version = "0.9.0"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
+checksum = "328f9f29b82409216decb172d81e936415d21245befa79cd34c3f29d87d1c50b"
dependencies = [
"bytes",
+ "cfg-if",
+ "cmake",
"heck",
"itertools",
"lazy_static",
@@ -510,9 +660,9 @@ dependencies = [
[[package]]
name = "prost-derive"
-version = "0.9.0"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
+checksum = "df35198f0777b75e9ff669737c6da5136b59dba33cf5a010a6d1cc4d56defc6f"
dependencies = [
"anyhow",
"itertools",
@@ -523,9 +673,9 @@ dependencies = [
[[package]]
name = "prost-types"
-version = "0.9.0"
+version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
+checksum = "926681c118ae6e512a3ccefd4abbe5521a14f4cc1e207356d4d00c0b7f2006fd"
dependencies = [
"bytes",
"prost",
@@ -581,12 +731,10 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.5.4"
+version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
+checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
- "aho-corasick",
- "memchr",
"regex-syntax",
]
@@ -606,23 +754,78 @@ dependencies = [
]
[[package]]
-name = "rocketmq-client-rust"
+name = "ring"
+version = "0.16.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
+dependencies = [
+ "cc",
+ "libc",
+ "once_cell",
+ "spin",
+ "untrusted",
+ "web-sys",
+ "winapi",
+]
+
+[[package]]
+name = "rocketmq"
version = "0.1.0"
dependencies = [
+ "futures",
"prost",
"prost-types",
+ "rustls",
"tokio",
+ "tokio-rustls",
"tonic",
"tonic-build",
]
[[package]]
+name = "rustls"
+version = "0.20.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921"
+dependencies = [
+ "log",
+ "ring",
+ "sct",
+ "webpki",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
+dependencies = [
+ "base64",
+]
+
+[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
+name = "sct"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
+dependencies = [
+ "ring",
+ "untrusted",
+]
+
+[[package]]
+name = "serde"
+version = "1.0.136"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
+
+[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -654,6 +857,12 @@ dependencies = [
]
[[package]]
+name = "spin"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
+
+[[package]]
name = "syn"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -665,6 +874,12 @@ dependencies = [
]
[[package]]
+name = "sync_wrapper"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
+
+[[package]]
name = "tempfile"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -720,6 +935,17 @@ dependencies = [
]
[[package]]
+name = "tokio-rustls"
+version = "0.23.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e"
+dependencies = [
+ "rustls",
+ "tokio",
+ "webpki",
+]
+
+[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -760,14 +986,16 @@ dependencies = [
[[package]]
name = "tonic"
-version = "0.6.2"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
+checksum = "9a1a361140b1af3f548e0a5105126b3fc737542f6cd4947b66419c80be07db22"
dependencies = [
"async-stream",
"async-trait",
+ "axum",
"base64",
"bytes",
+ "flate2",
"futures-core",
"futures-util",
"h2",
@@ -779,9 +1007,11 @@ dependencies = [
"pin-project",
"prost",
"prost-derive",
+ "rustls-pemfile",
"tokio",
+ "tokio-rustls",
"tokio-stream",
- "tokio-util 0.6.9",
+ "tokio-util 0.7.0",
"tower",
"tower-layer",
"tower-service",
@@ -791,10 +1021,11 @@ dependencies = [
[[package]]
name = "tonic-build"
-version = "0.6.2"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
+checksum = "4d17087af5c80e5d5fc8ba9878e60258065a0a757e35efe7a05b7904bece1943"
dependencies = [
+ "prettyplease",
"proc-macro2",
"prost-build",
"quote",
@@ -822,6 +1053,25 @@ dependencies = [
]
[[package]]
+name = "tower-http"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8"
+dependencies = [
+ "bitflags",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-range-header",
+ "pin-project-lite",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
name = "tower-layer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -883,18 +1133,18 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
-name = "unicode-segmentation"
-version = "1.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
-
-[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
+name = "untrusted"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+
+[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -911,6 +1161,80 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
+name = "wasm-bindgen"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06"
+dependencies = [
+ "cfg-if",
+ "wasm-bindgen-macro",
+]
+
+[[package]]
+name = "wasm-bindgen-backend"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca"
+dependencies = [
+ "bumpalo",
+ "lazy_static",
+ "log",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-macro"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01"
+dependencies = [
+ "quote",
+ "wasm-bindgen-macro-support",
+]
+
+[[package]]
+name = "wasm-bindgen-macro-support"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-backend",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-shared"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2"
+
+[[package]]
+name = "web-sys"
+version = "0.3.56"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "webpki"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
+dependencies = [
+ "ring",
+ "untrusted",
+]
+
+[[package]]
name = "which"
version = "4.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 52064d8..42483bd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,15 +1,23 @@
[package]
-name = "rocketmq-client-rust"
+name = "rocketmq"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-tonic = "0.6"
-prost = "0.9"
-prost-types = "0.9"
-tokio = { version = "1.0", features = ["full"]}
+futures = { version = "0.3", default-features = false, features = ["alloc"] }
+tonic = { version = "0.7", features = ["default", "tls", "compression"] }
+prost = "0.10"
+prost-types = "0.10"
+tokio = { version = "1", features = ["full"]}
+tokio-rustls = "0.23"
+rustls = {version = "0.20", features = ["default", "dangerous_configuration"]}
[build-dependencies]
-tonic-build = "0.6"
\ No newline at end of file
+tonic-build = {version = "0.7", features = ["default", "compression"]}
+
+
+[[bin]]
+name = "server"
+path = "src/bin/server.rs"
\ No newline at end of file
diff --git a/build.rs b/build.rs
index d210350..a3f0af4 100644
--- a/build.rs
+++ b/build.rs
@@ -1,8 +1,8 @@
fn main() {
- let idl_files = &["proto/apache/rocketmq/v1/service.proto"];
+ let idl_files = &["proto/apache/rocketmq/v2/service.proto"];
tonic_build::configure()
.build_client(true)
- .build_server(false)
+ .build_server(true)
.compile(idl_files, &["proto"])
.unwrap_or_else(|e| panic!("protoc failed: {}", e));
}
diff --git a/proto/apache/rocketmq/v1/definition.proto
b/proto/apache/rocketmq/v1/definition.proto
deleted file mode 100644
index 898695b..0000000
--- a/proto/apache/rocketmq/v1/definition.proto
+++ /dev/null
@@ -1,343 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/timestamp.proto";
-import "google/protobuf/duration.proto";
-
-package apache.rocketmq.v1;
-
-enum Permission {
- NONE = 0;
- READ = 1;
- WRITE = 2;
- READ_WRITE = 3;
-
- reserved 4 to 64;
-}
-
-enum FilterType {
- TAG = 0;
- SQL = 1;
-
- reserved 2 to 64;
-}
-
-message FilterExpression {
- FilterType type = 1;
- string expression = 2;
-
- reserved 3 to 64;
-}
-
-// Dead lettering is done on a best effort basis. The same message might be
-// dead lettered multiple times.
-//
-// If validation on any of the fields fails at subscription creation/update,
-// the create/update subscription request will fail.
-message DeadLetterPolicy {
- // The maximum number of delivery attempts for any message.
- //
- // This field will be honored on a best effort basis.
- //
- // If this parameter is 0, a default value of 16 is used.
- int32 max_delivery_attempts = 1;
-
- reserved 2 to 64;
-}
-
-message Resource {
- string resource_namespace = 1;
-
- // Resource name identifier, which remains unique within the abstract
resource
- // namespace.
- string name = 2;
-
- reserved 3 to 64;
-}
-
-enum ConsumeModel {
- CLUSTERING = 0;
- BROADCASTING = 1;
-
- reserved 2 to 64;
-}
-
-message ProducerData {
- Resource group = 1;
-
- reserved 2 to 64;
-}
-
-enum ConsumePolicy {
- RESUME = 0;
- PLAYBACK = 1;
- DISCARD = 2;
- TARGET_TIMESTAMP = 3;
-
- reserved 4 to 64;
-}
-
-enum ConsumeMessageType {
- ACTIVE = 0;
- PASSIVE = 1;
-
- reserved 2 to 64;
-}
-
-message ConsumerData {
- Resource group = 1;
-
- repeated SubscriptionEntry subscriptions = 2;
-
- ConsumeModel consume_model = 3;
-
- ConsumePolicy consume_policy = 4;
-
- DeadLetterPolicy dead_letter_policy = 5;
-
- ConsumeMessageType consume_type = 6;
-
- reserved 7 to 64;
-}
-
-message SubscriptionEntry {
- Resource topic = 1;
- FilterExpression expression = 2;
-
- reserved 3 to 64;
-}
-
-enum AddressScheme {
- IPv4 = 0;
- IPv6 = 1;
- DOMAIN_NAME = 2;
-
- reserved 3 to 64;
-}
-
-message Address {
- string host = 1;
- int32 port = 2;
-
- reserved 3 to 64;
-}
-
-message Endpoints {
- AddressScheme scheme = 1;
- repeated Address addresses = 2;
-
- reserved 3 to 64;
-}
-
-message Broker {
- // Name of the broker
- string name = 1;
-
- // Broker index. Canonically, index = 0 implies that the broker is playing
- // leader role while brokers with index > 0 play follower role.
- int32 id = 2;
-
- // Address of the broker, complying with the following scheme
- // 1. dns:[//authority/]host[:port]
- // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
- // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
- Endpoints endpoints = 3;
-
- reserved 4 to 64;
-}
-
-message Partition {
- Resource topic = 1;
- int32 id = 2;
- Permission permission = 3;
- Broker broker = 4;
-
- reserved 5 to 64;
-}
-
-enum MessageType {
- NORMAL = 0;
-
- // Sequenced message
- FIFO = 1;
-
- // Messages that are delivered after the specified duration.
- DELAY = 2;
-
- // Messages that are transactional. Only committed messages are delivered to
- // subscribers.
- TRANSACTION = 3;
-
- reserved 4 to 64;
-}
-
-enum DigestType {
- // CRC algorithm achieves goal of detecting random data error with lowest
- // computation overhead.
- CRC32 = 0;
-
- // MD5 algorithm achieves good balance between collision rate and computation
- // overhead.
- MD5 = 1;
-
- // SHA-family has substantially fewer collision with fair amount of
- // computation.
- SHA1 = 2;
-
- reserved 3 to 64;
-}
-
-// When publishing messages to or subscribing messages from brokers, clients
-// shall include or validate digests of message body to ensure data integrity.
-//
-// For message publishment, when an invalid digest were detected, brokers need
-// respond client with BAD_REQUEST.
-//
-// For messags subscription, when an invalid digest were detected, consumers
-// need to handle this case according to message type:
-// 1) Standard messages should be negatively acknowledged instantly, causing
-// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
-// previously acquired messages batch;
-//
-// Message consumption model also affects how invalid digest are handled. When
-// messages are consumed in broadcasting way,
-// TODO: define semantics of invalid-digest-when-broadcasting.
-message Digest {
- DigestType type = 1;
- string checksum = 2;
-
- reserved 3 to 64;
-}
-
-enum Encoding {
- IDENTITY = 0;
- GZIP = 1;
-
- reserved 2 to 64;
-}
-
-message SystemAttribute {
- // Tag
- string tag = 1;
-
- // Message keys
- repeated string keys = 2;
-
- // Message identifier, client-side generated, remains unique.
- // if message_id is empty, the send message request will be aborted with
- // status `INVALID_ARGUMENT`
- string message_id = 3;
-
- // Message body digest
- Digest body_digest = 4;
-
- // Message body encoding. Candidate options are identity, gzip, snappy etc.
- Encoding body_encoding = 5;
-
- // Message type, normal, FIFO or transactional.
- MessageType message_type = 6;
-
- // Message born time-point.
- google.protobuf.Timestamp born_timestamp = 7;
-
- // Message born host. Valid options are IPv4, IPv6 or client host domain
name.
- string born_host = 8;
-
- // Time-point at which the message is stored in the broker.
- google.protobuf.Timestamp store_timestamp = 9;
-
- // The broker that stores this message. It may be name, IP or arbitrary
- // identifier that uniquely identify the broker.
- string store_host = 10;
-
- oneof timed_delivery {
- // Time-point at which broker delivers to clients.
- google.protobuf.Timestamp delivery_timestamp = 11;
-
- // Level-based delay strategy.
- int32 delay_level = 12;
- }
-
- // If a message is acquired by way of POP, this field holds the receipt.
- // Clients use the receipt to acknowledge or negatively acknowledge the
- // message.
- string receipt_handle = 13;
-
- // Partition identifier in which a message is physically stored.
- int32 partition_id = 14;
-
- // Partition offset at which a message is stored.
- int64 partition_offset = 15;
-
- // Period of time servers would remain invisible once a message is acquired.
- google.protobuf.Duration invisible_period = 16;
-
- // Business code may failed to process messages for the moment. Hence,
clients
- // may request servers to deliver them again using certain back-off strategy,
- // the attempt is 1 not 0 if message is delivered first time.
- int32 delivery_attempt = 17;
-
- // Message producer load-balance group if applicable.
- Resource producer_group = 18;
-
- string message_group = 19;
-
- // Trace context.
- string trace_context = 20;
-
- // Delay time of first recover orphaned transaction request from server.
- google.protobuf.Duration orphaned_transaction_recovery_period = 21;
-
- reserved 22 to 64;
-}
-
-message Message {
-
- Resource topic = 1;
-
- // User defined key-value pairs.
- // If user_attribute contains the reserved keys by RocketMQ,
- // the send message request will be aborted with status `INVALID_ARGUMENT`.
- // See below links for the reserved keys
- //
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
- map<string, string> user_attribute = 2;
-
- SystemAttribute system_attribute = 3;
-
- bytes body = 4;
-
- reserved 5 to 64;
-}
-
-message Assignment {
- Partition Partition = 1;
-
- reserved 2 to 64;
-}
-
-enum QueryOffsetPolicy {
- // Use this option if client wishes to playback all existing messages.
- BEGINNING = 0;
-
- // Use this option if client wishes to skip all existing messages.
- END = 1;
-
- // Use this option if time-based seek is targeted.
- TIME_POINT = 2;
-
- reserved 3 to 64;
-}
\ No newline at end of file
diff --git a/proto/apache/rocketmq/v1/admin.proto
b/proto/apache/rocketmq/v2/admin.proto
similarity index 78%
rename from proto/apache/rocketmq/v1/admin.proto
rename to proto/apache/rocketmq/v2/admin.proto
index b452e97..7dbb702 100644
--- a/proto/apache/rocketmq/v1/admin.proto
+++ b/proto/apache/rocketmq/v2/admin.proto
@@ -15,7 +15,15 @@
syntax = "proto3";
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
message ChangeLogLevelRequest {
enum Level {
diff --git a/proto/apache/rocketmq/v2/definition.proto
b/proto/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..cd19a27
--- /dev/null
+++ b/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+ TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+ COMMIT = 1;
+ ROLLBACK = 2;
+}
+
+enum TransactionSource {
+ SOURCE_UNSPECIFIED = 0;
+ SOURCE_CLIENT = 1;
+ SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+ PERMISSION_UNSPECIFIED = 0;
+ NONE = 1;
+ READ = 2;
+ WRITE = 3;
+ READ_WRITE = 4;
+}
+
+enum FilterType {
+ FILTER_TYPE_UNSPECIFIED = 0;
+ TAG = 1;
+ SQL = 2;
+}
+
+message FilterExpression {
+ FilterType type = 1;
+ string expression = 2;
+}
+
+// Dead lettering is done on a best effort basis. The same message might be
+// dead lettered multiple times.
+//
+// If validation on any of the fields fails at subscription creation/update,
+// the create/update subscription request will fail.
+message DeadLetterPolicy {
+ // The maximum number of delivery attempts for any message.
+ //
+ // This field will be honored on a best effort basis.
+ //
+ // If this parameter is 0, a default value of 16 is used.
+ int32 max_delivery_attempts = 1;
+}
+
+message RetryPolicy {
+ int32 max_attempts = 1;
+ float initial_backoff = 2;
+ float max_backoff = 3;
+ float backoff_multiplier = 4;
+}
+
+message Resource {
+ string resource_namespace = 1;
+
+ // Resource name identifier, which remains unique within the abstract
resource
+ // namespace.
+ string name = 2;
+}
+
+enum ConsumeMessageType {
+ CONSUME_MESSAGE_TYPE_UNSPECIFIED = 0;
+ ACTIVE = 1;
+ PASSIVE = 2;
+}
+
+message Trace {
+ bool on = 1;
+ Endpoints service_access_point = 2;
+}
+
+enum AuthenticationMethod {
+ AUTHENTICATION_METHOD_UNSPECIFIED = 0;
+ SASL = 1;
+ MUTUAL_TLS = 2;
+ HTTP_BASIC_AUTH = 3;
+}
+
+message Authentication {
+ AuthenticationMethod method = 1;
+ string identity = 2;
+}
+
+// Transport
+message Timeout {
+ google.protobuf.Duration connect = 1;
+
+ google.protobuf.Duration request = 2;
+
+ // Long polling duration
+ google.protobuf.Duration polling = 3;
+}
+
+message Publish {
+ // Publisher normally registers topics in interest, such that
+ // pre-conditions may be examined and validated.
+ repeated Resource topics = 1;
+
+ // If a transactional message stay unresolved for more than
+ // `transaction_orphan_threshold`, it would be regarded as an
+ // orphan. Servers that manages orphan messages would pick up
+ // a capable publisher to resolve
+ google.protobuf.Duration transaction_orphan_threshold = 2;
+
+ // If publishing message experiences RPC failure, `retry_policy` describes
+ // backoff policy before retries are made.
+ RetryPolicy retry_policy = 3;
+
+ // If message body size exceeds `compress_threshold`, it would be desirable
to
+ // compress it to relieve network overhead.
+ int32 compress_threshold = 4;
+
+ // Max message size in bytes permitted by server.
+ int32 max_message_bytes = 5;
+}
+
+message CacheLimits {
+ int32 count = 1;
+ int64 bytes = 2;
+}
+
+message Subscription {
+ Resource group = 1;
+
+ repeated SubscriptionEntry subscriptions = 2;
+
+ DeadLetterPolicy dead_letter_policy = 3;
+
+ ConsumeMessageType consume_type = 4;
+
+ bool fifo = 5;
+
+ // For RPC
+ RetryPolicy retry_policy = 6;
+
+ // For PushConsumer
+ RetryPolicy consume_backoff_policy = 7;
+
+ int32 max_receive_batch_size = 8;
+
+ // After messages are received from servers, consumers normally split them
+ // into multiple mini-batches. Each mini-batch is assigned to a dedicated
+ // task, which will be submitted to thread-pool to run concurrently.
+ int32 messages_per_task = 9;
+
+ CacheLimits cache_limits = 10;
+
+ int32 consume_thread_count = 11;
+
+ // Up to server
+ google.protobuf.Duration max_invisible_duration = 12;
+}
+
+message SubscriptionEntry {
+ Resource topic = 1;
+ FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+ ADDRESS_SCHEME_UNSPECIFIED = 0;
+ IPv4 = 1;
+ IPv6 = 2;
+ DOMAIN_NAME = 3;
+}
+
+message Address {
+ string host = 1;
+ int32 port = 2;
+}
+
+message Endpoints {
+ AddressScheme scheme = 1;
+ repeated Address addresses = 2;
+}
+
+message Broker {
+ // Name of the broker
+ string name = 1;
+
+ // Broker index. Canonically, index = 0 implies that the broker is playing
+ // leader role while brokers with index > 0 play follower role.
+ int32 id = 2;
+
+ // Address of the broker, complying with the following scheme
+ // 1. dns:[//authority/]host[:port]
+ // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+ // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+ Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+ Resource topic = 1;
+ int32 id = 2;
+ Permission permission = 3;
+ Broker broker = 4;
+ repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+ MESSAGE_TYPE_UNSPECIFIED = 0;
+
+ NORMAL = 1;
+
+ // Sequenced message
+ FIFO = 2;
+
+ // Messages that are delivered after the specified duration.
+ DELAY = 3;
+
+ // Messages that are transactional. Only committed messages are delivered to
+ // subscribers.
+ TRANSACTION = 4;
+}
+
+enum DigestType {
+ DIGEST_TYPE_UNSPECIFIED = 0;
+
+ // CRC algorithm achieves goal of detecting random data error with lowest
+ // computation overhead.
+ CRC32 = 1;
+
+ // MD5 algorithm achieves good balance between collision rate and computation
+ // overhead.
+ MD5 = 2;
+
+ // SHA-family has substantially fewer collision with fair amount of
+ // computation.
+ SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishment, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messags subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+ DigestType type = 1;
+ string checksum = 2;
+}
+
+enum Encoding {
+ ENCODING_UNSPECIFIED = 0;
+
+ IDENTITY = 1;
+
+ GZIP = 2;
+}
+
+message SystemProperties {
+ // Tag
+ string tag = 1;
+
+ // Message keys
+ repeated string keys = 2;
+
+ // Message identifier, client-side generated, remains unique.
+ // if message_id is empty, the send message request will be aborted with
+ // status `INVALID_ARGUMENT`
+ string message_id = 3;
+
+ // Message body digest
+ Digest body_digest = 4;
+
+ // Message body encoding. Candidate options are identity, gzip, snappy etc.
+ Encoding body_encoding = 5;
+
+ // Message type, normal, FIFO or transactional.
+ MessageType message_type = 6;
+
+ // Message born time-point.
+ google.protobuf.Timestamp born_timestamp = 7;
+
+ // Message born host. Valid options are IPv4, IPv6 or client host domain
name.
+ string born_host = 8;
+
+ // Time-point at which the message is stored in the broker.
+ google.protobuf.Timestamp store_timestamp = 9;
+
+ // The broker that stores this message. It may be broker name, IP or
arbitrary
+ // identifier that uniquely identify the server.
+ string store_host = 10;
+
+ // Time-point at which broker delivers to clients.
+ google.protobuf.Timestamp delivery_timestamp = 11;
+
+ // If a message is acquired by way of POP, this field holds the receipt.
+ // Clients use the receipt to acknowledge or negatively acknowledge the
+ // message.
+ string receipt_handle = 12;
+
+ // Message queue identifier in which a message is physically stored.
+ int32 queue_id = 13;
+
+ // Message-queue offset at which a message is stored.
+ int64 queue_offset = 14;
+
+ // Period of time servers would remain invisible once a message is acquired.
+ google.protobuf.Duration invisible_duration = 15;
+
+ // Business code may failed to process messages for the moment. Hence,
clients
+ // may request servers to deliver them again using certain back-off strategy,
+ // the attempt is 1 not 0 if message is delivered first time.
+ int32 delivery_attempt = 16;
+
+ string message_group = 17;
+
+ // Trace context.
+ string trace_context = 18;
+
+ // Delay time of first recover orphaned transaction request from server.
+ google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+
+ Resource topic = 1;
+
+ // User defined key-value pairs.
+ // If user_properties contain the reserved keys by RocketMQ,
+ // the send message request will be aborted with status `INVALID_ARGUMENT`.
+ // See below links for the reserved keys
+ //
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+ map<string, string> user_properties = 2;
+
+ SystemProperties system_properties = 3;
+
+ bytes body = 4;
+}
+
+message Assignment { MessageQueue message_queue = 1; }
+
+enum QueryOffsetPolicy {
+ QUERY_OFFSET_POLICY_UNSPECIFIED = 0;
+
+ // Use this option if client wishes to playback all existing messages.
+ BEGINNING = 1;
+
+ // Use this option if client wishes to skip all existing messages.
+ END = 2;
+
+ // Use this option if time-based seek is targeted.
+ TIME_POINT = 3;
+}
+
+message SendReceipt {
+ string message_id = 1;
+ string transaction_id = 2;
+}
+
+enum Code {
+ // Success.
+ OK = 0;
+ // Format of access point is illegal.
+ ILLEGAL_ACCESS_POINT = 1;
+ // Format of topic is illegal.
+ ILLEGAL_TOPIC = 2;
+ // Format of consumer group is illegal.
+ ILLEGAL_CONSUMER_GROUP = 3;
+ // Format of message tag is illegal.
+ ILLEGAL_MESSAGE_TAG = 4;
+ // Format of message key is illegal.
+ ILLEGAL_MESSAGE_KEY = 5;
+ // Size of message keys exceeds the threshold.
+ MESSAGE_KEYS_TOO_LARGE = 6;
+ // Format of message group is illegal.
+ ILLEGAL_MESSAGE_GROUP = 7;
+ // Format of message property key is illegal.
+ ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
+ // Message properties total size exceeds the threshold.
+ MESSAGE_PROPERTIES_TOO_LARGE = 9;
+ // Message body size exceeds the threshold.
+ MESSAGE_BODY_TOO_LARGE = 10;
+
+ // User does not have the permission to operate.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
+ FORBIDDEN = 403;
+
+ // Code indicates that the client request has not been completed
+ // because it lacks valid authentication credentials for the
+ // requested resource.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
+ UNAUTHORIZED = 401;
+
+ // Topic resource does not exist.
+ TOPIC_NOT_FOUND = 13;
+
+ // Consumer group resource does not exist.
+ CONSUMER_GROUP_NOT_FOUND = 14;
+
+ // Not allowed to verify message. Chances are that you are verifying
+ // a FIFO message, as is violating FIFO semantics.
+ VERIFY_MESSAGE_FORBIDDEN = 15;
+
+ // Failed to consume message.
+ FAILED_TO_CONSUME_MESSAGE = 16;
+
+ // Message is corrupted.
+ MESSAGE_CORRUPTED = 17;
+
+ // Too many requests are made in short period of duration.
+ // Requests are throttled.
+ TOO_MANY_REQUESTS = 18;
+
+ // Expired receip-handle is used when trying to acknowledge or change
+ // invisible duration of a message
+ RECEIPT_HANDLE_EXPIRED = 19;
+
+ // Message property is not match the message type.
+ MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
+
+ // Code indicates that the server encountered an unexpected condition
+ // that prevented it from fulfilling the request.
+ // This error response is a generic "catch-all" response.
+ // Usually, this indicates the server cannot find a better alternative
+ // error code to response. Sometimes, server administrators log error
+ // responses like the 500 status code with more details about the request
+ // to prevent the error from happening again in the future.
+ //
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+ INTERNAL_SERVER_ERROR = 500;
+
+ // Code means that the server or client does not support the functionality
+ // required to fulfill the request.
+ NOT_IMPLEMENTED = 501;
+
+ // Code indicates that the server, while acting as a gateway or proxy,
+ // did not get a response in time from the upstream server that
+ // it needed in order to complete the request.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+ GATEWAY_TIMEOUT = 504;
+}
+
+message Status {
+ Code code = 1;
+ string message = 2;
+}
\ No newline at end of file
diff --git a/proto/apache/rocketmq/v1/service.proto
b/proto/apache/rocketmq/v2/service.proto
similarity index 55%
rename from proto/apache/rocketmq/v1/service.proto
rename to proto/apache/rocketmq/v2/service.proto
index 77fa63f..f01c55d 100644
--- a/proto/apache/rocketmq/v1/service.proto
+++ b/proto/apache/rocketmq/v2/service.proto
@@ -17,35 +17,29 @@ syntax = "proto3";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
-import "google/rpc/error_details.proto";
-import "google/rpc/status.proto";
-import "apache/rocketmq/v1/definition.proto";
+import "apache/rocketmq/v2/definition.proto";
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
-message ResponseCommon {
- google.rpc.Status status = 1;
- google.rpc.RequestInfo request_info = 2;
- google.rpc.Help help = 3;
- google.rpc.RetryInfo retry_info = 4;
- google.rpc.DebugInfo debug_info = 5;
- google.rpc.ErrorInfo error_info = 6;
-
- reserved 7 to 64;
-}
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
// Topics are destination of messages to publish to or subscribe from. Similar
// to domain names, they will be addressable after resolution through the
// provided access point.
//
// Access points are usually the addresses of name servers, which fulfill
-// service discovery, load-balancing and other auxillary services. Name servers
+// service discovery, load-balancing and other auxiliary services. Name servers
// receive periodic heartbeats from affiliate brokers and erase those which
// failed to maintain alive status.
//
// Name servers answer queries of QueryRouteRequest, responding clients with
-// addressable partitions, which they may directly publish messages to or
+// addressable message-queues, which they may directly publish messages to or
// subscribe messages from.
//
// QueryRouteRequest shall include source endpoints, aka, configured
@@ -56,31 +50,22 @@ message QueryRouteRequest {
Resource topic = 1;
Endpoints endpoints = 2;
-
- reserved 3 to 64;
}
message QueryRouteResponse {
- ResponseCommon common = 1;
+ Status status = 1;
- repeated Partition partitions = 2;
-
- reserved 3 to 64;
+ repeated MessageQueue message_queues = 2;
}
message SendMessageRequest {
- Message message = 1;
- Partition partition = 2;
-
- reserved 3 to 64;
+ repeated Message messages = 1;
+ MessageQueue message_queue = 2;
}
message SendMessageResponse {
- ResponseCommon common = 1;
- string message_id = 2;
- string transaction_id = 3;
-
- reserved 4 to 64;
+ Status status = 1;
+ repeated SendReceipt receipts = 2;
}
message QueryAssignmentRequest {
@@ -90,77 +75,41 @@ message QueryAssignmentRequest {
// Service access point
Endpoints endpoints = 4;
-
- reserved 5 to 64;
}
message QueryAssignmentResponse {
- ResponseCommon common = 1;
+ Status status = 1;
repeated Assignment assignments = 2;
-
- reserved 3 to 64;
}
message ReceiveMessageRequest {
Resource group = 1;
string client_id = 2;
- Partition partition = 3;
+ MessageQueue message_queue = 3;
FilterExpression filter_expression = 4;
- ConsumePolicy consume_policy = 5;
- google.protobuf.Timestamp initialization_timestamp = 6;
- int32 batch_size = 7;
- google.protobuf.Duration invisible_duration = 8;
- google.protobuf.Duration await_time = 9;
- bool fifo_flag = 10;
-
- reserved 11 to 64;
+ google.protobuf.Timestamp initialization_timestamp = 5;
+ int32 batch_size = 6;
+ google.protobuf.Duration invisible_duration = 7;
+ google.protobuf.Duration await_duration = 8;
+ bool fifo = 9;
}
message ReceiveMessageResponse {
- ResponseCommon common = 1;
+ Status status = 1;
repeated Message messages = 2;
google.protobuf.Timestamp delivery_timestamp = 3;
google.protobuf.Duration invisible_duration = 4;
-
- reserved 5 to 64;
}
message AckMessageRequest {
Resource group = 1;
Resource topic = 2;
string client_id = 3;
- oneof handle {
- string receipt_handle = 4;
- int64 offset = 5;
- }
- string message_id = 6;
-
- reserved 7 to 64;
-}
-
-message AckMessageResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message NackMessageRequest {
- Resource group = 1;
- Resource topic = 2;
- string client_id = 3;
string receipt_handle = 4;
string message_id = 5;
- int32 delivery_attempt = 6;
- int32 max_delivery_attempts = 7;
-
- reserved 8 to 64;
}
-message NackMessageResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
+message AckMessageResponse { Status status = 1; }
message ForwardMessageToDeadLetterQueueRequest {
Resource group = 1;
@@ -170,196 +119,130 @@ message ForwardMessageToDeadLetterQueueRequest {
string message_id = 5;
int32 delivery_attempt = 6;
int32 max_delivery_attempts = 7;
-
- reserved 8 to 64;
}
-message ForwardMessageToDeadLetterQueueResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
message HeartbeatRequest {
string client_id = 1;
- oneof client_data {
- ProducerData producer_data = 2;
- ConsumerData consumer_data = 3;
- }
- bool fifo_flag = 4;
-
- reserved 5 to 64;
-}
-
-message HeartbeatResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message HealthCheckRequest {
- Resource group = 1;
- string client_host = 2;
-
- reserved 3 to 64;
+ Resource group = 2;
}
-message HealthCheckResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
+message HeartbeatResponse { Status status = 1; }
message EndTransactionRequest {
Resource group = 1;
string message_id = 2;
string transaction_id = 3;
- enum TransactionResolution {
- COMMIT = 0;
- ROLLBACK = 1;
- }
TransactionResolution resolution = 4;
- enum Source {
- CLIENT = 0;
- SERVER_CHECK = 1;
- }
- Source source = 5;
+ TransactionSource source = 5;
string trace_context = 6;
-
- reserved 7 to 64;
}
-message EndTransactionResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
+message EndTransactionResponse { Status status = 1; }
message QueryOffsetRequest {
- Partition partition = 1;
+ MessageQueue message_queue = 1;
QueryOffsetPolicy policy = 2;
google.protobuf.Timestamp time_point = 3;
-
- reserved 4 to 64;
}
message QueryOffsetResponse {
- ResponseCommon common = 1;
+ Status status = 1;
int64 offset = 2;
-
- reserved 3 to 64;
}
message PullMessageRequest {
Resource group = 1;
- Partition partition = 2;
+ MessageQueue message_queue = 2;
int64 offset = 3;
int32 batch_size = 4;
google.protobuf.Duration await_time = 5;
FilterExpression filter_expression = 6;
string client_id = 7;
-
- reserved 8 to 64;
}
message PullMessageResponse {
- ResponseCommon common = 1;
+ Status status = 1;
int64 min_offset = 2;
int64 next_offset = 3;
int64 max_offset = 4;
repeated Message messages = 5;
-
- reserved 6 to 64;
}
-message NoopCommand { reserved 1 to 64; }
-
-message PrintThreadStackTraceCommand {
- string command_id = 1;
+message PrintThreadStackTraceCommand { int64 command_id = 1; }
- reserved 2 to 64;
-}
-
-message ReportThreadStackTraceRequest {
- string command_id = 1;
+message ThreadStackTrace {
+ int64 command_id = 1;
string thread_stack_trace = 2;
-
- reserved 3 to 64;
}
-message ReportThreadStackTraceResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
- string command_id = 1;
+message VerifyMessageCommand {
+ int64 command_id = 1;
Message message = 2;
-
- reserved 3 to 64;
-}
-
-message ReportMessageConsumptionResultRequest {
- string command_id = 1;
- google.rpc.Status status = 2;
-
- reserved 3 to 64;
}
-message ReportMessageConsumptionResultResponse {
- ResponseCommon common = 1;
-
- reserved 2 to 64;
+message VerifyMessageResult {
+ int64 command_id = 1;
+ Status status = 2;
}
message RecoverOrphanedTransactionCommand {
- Message orphaned_transactional_message = 1;
- string transaction_id = 2;
-
- reserved 3 to 64;
+ int64 command_id = 1;
+ Message orphaned_transactional_message = 2;
+ string transaction_id = 3;
}
-message PollCommandRequest {
+message Settings {
string client_id = 1;
- repeated Resource topics = 2;
- oneof group {
- Resource producer_group = 3;
- Resource consumer_group = 4;
- }
-
- reserved 5 to 64;
+ string access_point = 2;
+ Publish publish = 3;
+ Subscription subscription = 4;
+ Authentication authentication = 5;
}
-message PollCommandResponse {
- oneof type {
- // Default command when no new command need to be delivered.
- NoopCommand noop_command = 1;
+message TelemetryCommand {
+ oneof command {
+ Settings settings = 1;
+
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
+
// Request client to print thread stack trace.
- PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
+
+ ThreadStackTrace thread_stack_trace = 4;
+
// Request client to verify the consumption of the appointed message.
- VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
- // Request client to recover the orphaned transaction message.
- RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
- }
+ VerifyMessageCommand verify_message_command = 5;
- reserved 5 to 64;
+ VerifyMessageResult verify_message_result = 6;
+ }
}
message NotifyClientTerminationRequest {
- oneof group {
- Resource producer_group = 1;
- Resource consumer_group = 2;
- }
- string client_id = 3;
+ Resource group = 1;
+ string client_id = 2;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+ Resource group = 1;
+ Resource topic = 2;
+
+ // Unique receipt handle to identify message to change
+ string receipt_handle = 3;
- reserved 4 to 64;
+ // New invisible duration
+ google.protobuf.Duration invisible_duration = 4;
}
-message NotifyClientTerminationResponse {
- ResponseCommon common = 1;
+message ChangeInvisibleDurationResponse {
+ Status status = 1;
- reserved 2 to 64;
+ // Server may generate a new receipt handle for the message.
+ string receipt_handle = 2;
}
// For all the RPCs in MessagingService, the following error handling policies
@@ -374,10 +257,10 @@ message NotifyClientTerminationResponse {
// errors raise, return a response with common.status.code == `INTERNAL`.
service MessagingService {
- // Querys the route entries of the requested topic in the perspective of the
+ // Queries the route entries of the requested topic in the perspective of the
// given endpoints. On success, servers should return a collection of
- // addressable partitions. Note servers may return customized route entries
- // based on endpoints provided.
+ // addressable message-queues. Note servers may return customized route
+ // entries based on endpoints provided.
//
// If the requested topic doesn't exist, returns `NOT_FOUND`.
// If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
@@ -393,14 +276,9 @@ service MessagingService {
// returns `INVALID_ARGUMENT`
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
- // Checks the health status of message server, returns `OK` if services are
- // online and serving. Clients may use this RPC to detect availability of
- // messaging service, and take isolation actions when necessary.
- rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
-
// Delivers messages to brokers.
// Clients may further:
- // 1. Refine a message destination to topic partition which fulfills parts of
+ // 1. Refine a message destination to message-queues which fulfills parts of
// FIFO semantic;
// 2. Flag a message as transactional, which keeps it invisible to consumers
// until it commits;
@@ -413,16 +291,16 @@ service MessagingService {
// If the destination topic doesn't exist, returns `NOT_FOUND`.
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
- // Querys the assigned partition route info of a topic for current consumer,
- // the returned assignment result is descided by server-side load balacner.
+ // Queries the assigned route info of a topic for current consumer,
+ // the returned assignment result is decided by server-side load balancer.
//
// If the corresponding topic doesn't exist, returns `NOT_FOUND`.
- // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
rpc QueryAssignment(QueryAssignmentRequest)
returns (QueryAssignmentResponse) {}
// Receives messages from the server in batch manner, returns a set of
- // messages if success. The received messages should be acked or uacked after
+ // messages if success. The received messages should be acked or nacked after
// processed.
//
// If the pending concurrent receive requests exceed the quota of the given
@@ -442,14 +320,6 @@ service MessagingService {
// `INVALID_ARGUMENT`.
rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
- // Signals that the message has not been successfully processed. The message
- // server should resend the message follow the retry policy defined at
- // server-side.
- //
- // If the corresponding topic or consumer group doesn't exist, returns
- // `NOT_FOUND`.
- rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
-
// Forwards one message to dead letter queue if the DeadLetterPolicy is
// triggered by this message at client-side, return `OK` if success.
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
@@ -458,14 +328,14 @@ service MessagingService {
// Commits or rollback one transactional message.
rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
- // Querys the offset of the specific partition, returns the offset with `OK`
- // if success. The message server should maintain a numerical offset for each
- // message in a parition.
+ // Queries the offset of the specific message queue, returns the offset with
+ // `OK` if success. The message server should maintain a numerical offset for
+ // each message in a message-queue.
rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
- // Pulls messages from the specific partition, returns a set of messages with
- // next pull offset. The pulled messages can't be acked or nacked, while the
- // client is responsible for manage offesets for consumer, typically update
+ // Pulls messages from the specific message-queue, returns a set of messages
+ // with next pull offset. The pulled messages can't be acked or nacked, while
+ // the client is responsible for manage offsets for consumer, typically
update
// consume offset to local memory or a third-party storage service.
//
// If the pending concurrent receive requests exceed the quota of the given
@@ -476,35 +346,24 @@ service MessagingService {
// Please note that client may suffer from false empty responses.
rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
- // Multiplexing RPC(s) for various polling requests, which issue different
- // commands to client.
- //
- // Sometimes client may need to receive and process the command from server.
- // To prevent the complexity of streaming RPC(s), a unary RPC using
- // long-polling is another solution.
+ // Once a client starts, it would immediately establishes bi-lateral stream
+ // RPCs with brokers, reporting its settings as the initiative command.
//
- // To mark the request-response of corresponding command, `command_id` in
- // message is recorded in the subsequent RPC(s). For example, after receiving
- // command of printing thread stack trace, client would send
- // `ReportMessageConsumptionResultRequest` to server, which contain both of
- // the stack trace and `command_id`.
- //
- // At same time, `NoopCommand` is delivered from server when no new command
is
- // needed, it is essential for client to maintain the ping-pong.
- //
- rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
- // After receiving the corresponding polling command, the thread stack trace
- // is reported to the server.
- rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
- returns (ReportThreadStackTraceResponse) {}
-
- // After receiving the corresponding polling command, the consumption result
- // of appointed message is reported to the server.
- rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
- returns (ReportMessageConsumptionResultResponse) {}
+ // When servers have need of inspecting client status, they would issue
+ // telemetry commands to clients. After executing recieved instructions,
+ // clients shall report command execution results through client-side
streams.
+ rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
// Notify the server that the client is terminated.
rpc NotifyClientTermination(NotifyClientTerminationRequest)
returns (NotifyClientTerminationResponse) {}
+
+ // Once a message is retrieved from consume queue on behalf of the group, it
+ // will be kept invisible to other clients of the same group for a period of
+ // time. The message is supposed to be processed within the invisible
+ // duration. If the client, which is in charge of the invisible message, is
+ // not capable of processing the message timely, it may use
+ // ChangeInvisibleDuration to lengthen invisible duration.
+ rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest)
+ returns (ChangeInvisibleDurationResponse) {}
}
\ No newline at end of file
diff --git a/proto/google/rpc/error_details.proto
b/proto/google/rpc/error_details.proto
deleted file mode 100644
index aca01eb..0000000
--- a/proto/google/rpc/error_details.proto
+++ /dev/null
@@ -1,247 +0,0 @@
-// Copyright 2020 Google LLC
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package google.rpc;
-
-import "google/protobuf/duration.proto";
-
-option go_package =
"google.golang.org/genproto/googleapis/rpc/errdetails;errdetails";
-option java_multiple_files = true;
-option java_outer_classname = "ErrorDetailsProto";
-option java_package = "com.google.rpc";
-option objc_class_prefix = "RPC";
-
-// Describes when the clients can retry a failed request. Clients could ignore
-// the recommendation here or retry when this information is missing from error
-// responses.
-//
-// It's always recommended that clients should use exponential backoff when
-// retrying.
-//
-// Clients should wait until `retry_delay` amount of time has passed since
-// receiving the error response before retrying. If retrying requests also
-// fail, clients should use an exponential backoff scheme to gradually increase
-// the delay between retries based on `retry_delay`, until either a maximum
-// number of retries have been reached or a maximum retry delay cap has been
-// reached.
-message RetryInfo {
- // Clients should wait at least this long between retrying the same request.
- google.protobuf.Duration retry_delay = 1;
-}
-
-// Describes additional debugging info.
-message DebugInfo {
- // The stack trace entries indicating where the error occurred.
- repeated string stack_entries = 1;
-
- // Additional debugging information provided by the server.
- string detail = 2;
-}
-
-// Describes how a quota check failed.
-//
-// For example if a daily limit was exceeded for the calling project,
-// a service could respond with a QuotaFailure detail containing the project
-// id and the description of the quota limit that was exceeded. If the
-// calling project hasn't enabled the service in the developer console, then
-// a service could respond with the project id and set `service_disabled`
-// to true.
-//
-// Also see RetryInfo and Help types for other details about handling a
-// quota failure.
-message QuotaFailure {
- // A message type used to describe a single quota violation. For example, a
- // daily quota or a custom quota that was exceeded.
- message Violation {
- // The subject on which the quota check failed.
- // For example, "clientip:<ip address of client>" or "project:<Google
- // developer project id>".
- string subject = 1;
-
- // A description of how the quota check failed. Clients can use this
- // description to find more about the quota configuration in the service's
- // public documentation, or find the relevant quota limit to adjust through
- // developer console.
- //
- // For example: "Service disabled" or "Daily Limit for read operations
- // exceeded".
- string description = 2;
- }
-
- // Describes all quota violations.
- repeated Violation violations = 1;
-}
-
-// Describes the cause of the error with structured details.
-//
-// Example of an error when contacting the "pubsub.googleapis.com" API when it
-// is not enabled:
-// { "reason": "API_DISABLED"
-// "domain": "googleapis.com"
-// "metadata": {
-// "resource": "projects/123",
-// "service": "pubsub.googleapis.com"
-// }
-// }
-//
-// This response indicates that the pubsub.googleapis.com API is not enabled.
-//
-// Example of an error that is returned when attempting to create a Spanner
-// instance in a region that is out of stock:
-// { "reason": "STOCKOUT"
-// "domain": "spanner.googleapis.com",
-// "metadata": {
-// "availableRegions": "us-central1,us-east2"
-// }
-// }
-message ErrorInfo {
- // The reason of the error. This is a constant value that identifies the
- // proximate cause of the error. Error reasons are unique within a particular
- // domain of errors. This should be at most 63 characters and match
- // /[A-Z0-9_]+/.
- string reason = 1;
-
- // The logical grouping to which the "reason" belongs. The error domain
- // is typically the registered service name of the tool or product that
- // generates the error. Example: "pubsub.googleapis.com". If the error is
- // generated by some common infrastructure, the error domain must be a
- // globally unique value that identifies the infrastructure. For Google API
- // infrastructure, the error domain is "googleapis.com".
- string domain = 2;
-
- // Additional structured details about this error.
- //
- // Keys should match /[a-zA-Z0-9-_]/ and be limited to 64 characters in
- // length. When identifying the current value of an exceeded limit, the units
- // should be contained in the key, not the value. For example, rather than
- // {"instanceLimit": "100/request"}, should be returned as,
- // {"instanceLimitPerRequest": "100"}, if the client exceeds the number of
- // instances that can be created in a single (batch) request.
- map<string, string> metadata = 3;
-}
-
-// Describes what preconditions have failed.
-//
-// For example, if an RPC failed because it required the Terms of Service to be
-// acknowledged, it could list the terms of service violation in the
-// PreconditionFailure message.
-message PreconditionFailure {
- // A message type used to describe a single precondition failure.
- message Violation {
- // The type of PreconditionFailure. We recommend using a service-specific
- // enum type to define the supported precondition violation subjects. For
- // example, "TOS" for "Terms of Service violation".
- string type = 1;
-
- // The subject, relative to the type, that failed.
- // For example, "google.com/cloud" relative to the "TOS" type would
indicate
- // which terms of service is being referenced.
- string subject = 2;
-
- // A description of how the precondition failed. Developers can use this
- // description to understand how to fix the failure.
- //
- // For example: "Terms of service not accepted".
- string description = 3;
- }
-
- // Describes all precondition violations.
- repeated Violation violations = 1;
-}
-
-// Describes violations in a client request. This error type focuses on the
-// syntactic aspects of the request.
-message BadRequest {
- // A message type used to describe a single bad request field.
- message FieldViolation {
- // A path leading to a field in the request body. The value will be a
- // sequence of dot-separated identifiers that identify a protocol buffer
- // field. E.g., "field_violations.field" would identify this field.
- string field = 1;
-
- // A description of why the request element is bad.
- string description = 2;
- }
-
- // Describes all violations in a client request.
- repeated FieldViolation field_violations = 1;
-}
-
-// Contains metadata about the request that clients can attach when filing a
bug
-// or providing other forms of feedback.
-message RequestInfo {
- // An opaque string that should only be interpreted by the service generating
- // it. For example, it can be used to identify requests in the service's
logs.
- string request_id = 1;
-
- // Any data that was used to serve this request. For example, an encrypted
- // stack trace that can be sent back to the service provider for debugging.
- string serving_data = 2;
-}
-
-// Describes the resource that is being accessed.
-message ResourceInfo {
- // A name for the type of resource being accessed, e.g. "sql table",
- // "cloud storage bucket", "file", "Google calendar"; or the type URL
- // of the resource: e.g. "type.googleapis.com/google.pubsub.v1.Topic".
- string resource_type = 1;
-
- // The name of the resource being accessed. For example, a shared calendar
- // name: "[email protected]", if the current
- // error is
[google.rpc.Code.PERMISSION_DENIED][google.rpc.Code.PERMISSION_DENIED].
- string resource_name = 2;
-
- // The owner of the resource (optional).
- // For example, "user:<owner email>" or "project:<Google developer project
- // id>".
- string owner = 3;
-
- // Describes what error is encountered when accessing this resource.
- // For example, updating a cloud project may require the `writer` permission
- // on the developer console project.
- string description = 4;
-}
-
-// Provides links to documentation or for performing an out of band action.
-//
-// For example, if a quota check failed with an error indicating the calling
-// project hasn't enabled the accessed service, this can contain a URL pointing
-// directly to the right place in the developer console to flip the bit.
-message Help {
- // Describes a URL link.
- message Link {
- // Describes what the link offers.
- string description = 1;
-
- // The URL of the link.
- string url = 2;
- }
-
- // URL(s) pointing to additional information on handling the current error.
- repeated Link links = 1;
-}
-
-// Provides a localized error message that is safe to return to the user
-// which can be attached to an RPC error.
-message LocalizedMessage {
- // The locale used following the specification defined at
- // http://www.rfc-editor.org/rfc/bcp/bcp47.txt.
- // Examples are: "en-US", "fr-CH", "es-MX"
- string locale = 1;
-
- // The localized error message in the above locale.
- string message = 2;
-}
diff --git a/proto/google/rpc/status.proto b/proto/google/rpc/status.proto
deleted file mode 100644
index 3b1f7a9..0000000
--- a/proto/google/rpc/status.proto
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2020 Google LLC
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package google.rpc;
-
-import "google/protobuf/any.proto";
-
-option cc_enable_arenas = true;
-option go_package = "google.golang.org/genproto/googleapis/rpc/status;status";
-option java_multiple_files = true;
-option java_outer_classname = "StatusProto";
-option java_package = "com.google.rpc";
-option objc_class_prefix = "RPC";
-
-// The `Status` type defines a logical error model that is suitable for
-// different programming environments, including REST APIs and RPC APIs. It is
-// used by [gRPC](https://github.com/grpc). Each `Status` message contains
-// three pieces of data: error code, error message, and error details.
-//
-// You can find out more about this error model and how to work with it in the
-// [API Design Guide](https://cloud.google.com/apis/design/errors).
-message Status {
- // The status code, which should be an enum value of
[google.rpc.Code][google.rpc.Code].
- int32 code = 1;
-
- // A developer-facing error message, which should be in English. Any
- // user-facing error message should be localized and sent in the
- // [google.rpc.Status.details][google.rpc.Status.details] field, or
localized by the client.
- string message = 2;
-
- // A list of messages that carry the error details. There is a common set of
- // message types for APIs to use.
- repeated google.protobuf.Any details = 3;
-}
diff --git a/src/bin/server.rs b/src/bin/server.rs
new file mode 100644
index 0000000..d9c1eee
--- /dev/null
+++ b/src/bin/server.rs
@@ -0,0 +1,15 @@
+use rocketmq::pb::messaging_service_server::MessagingServiceServer;
+use rocketmq::service::ServerService;
+use tonic::transport::Server;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let address = "127.0.0.1:5001".parse().unwrap();
+ let service = ServerService::default();
+ println!("Server listens {}", address);
+ Server::builder()
+ .add_service(MessagingServiceServer::new(service))
+ .serve(address)
+ .await?;
+ Ok(())
+}
diff --git a/src/lib.rs b/src/lib.rs
index 346484c..813a8ee 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,26 +1,3 @@
-pub mod apache {
- pub mod rocketmq {
- pub mod v1 {
- tonic::include_proto!("apache.rocketmq.v1");
- }
- }
-}
-
-pub mod google {
- pub mod rpc {
- tonic::include_proto!("google.rpc");
- }
-}
-
-pub mod org_apache_rocketmq {
-
- #[derive(Debug)]
- struct RpcClient {}
-}
-
-#[cfg(test)]
-pub mod tests {
-
- #[test]
- fn it_works() {}
-}
+pub mod pb;
+pub mod rocketmq;
+pub mod service;
diff --git a/src/pb.rs b/src/pb.rs
new file mode 100644
index 0000000..d30fe30
--- /dev/null
+++ b/src/pb.rs
@@ -0,0 +1 @@
+tonic::include_proto!("apache.rocketmq.v2");
diff --git a/src/rocketmq.rs b/src/rocketmq.rs
new file mode 100644
index 0000000..8c4f440
--- /dev/null
+++ b/src/rocketmq.rs
@@ -0,0 +1,90 @@
+use crate::pb::{
+ messaging_service_client::MessagingServiceClient, QueryRouteRequest,
QueryRouteResponse,
+ Resource,
+};
+use rustls::client::ServerCertVerifier;
+use tonic::{
+ transport::{Channel, ClientTlsConfig},
+ Request, Response,
+};
+
+pub struct RpcClient {
+ stub: MessagingServiceClient<Channel>,
+ remote_address: String,
+}
+
+struct TrustAllCertVerifier;
+
+impl ServerCertVerifier for TrustAllCertVerifier {
+ fn verify_server_cert(
+ &self,
+ end_entity: &rustls::Certificate,
+ intermediates: &[rustls::Certificate],
+ server_name: &rustls::ServerName,
+ scts: &mut dyn Iterator<Item = &[u8]>,
+ ocsp_response: &[u8],
+ now: std::time::SystemTime,
+ ) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
+ Ok(rustls::client::ServerCertVerified::assertion())
+ }
+}
+
+impl RpcClient {
+ pub async fn new(target: &'static str) -> Result<RpcClient, Box<dyn
std::error::Error>> {
+ let remote_address = String::from(target);
+
+ let mut channel = Channel::from_shared(target)?
+ .tcp_nodelay(true)
+ .connect_timeout(std::time::Duration::from_secs(3));
+ if remote_address.starts_with("https://") {
+ let verifier = std::sync::Arc::new(TrustAllCertVerifier {});
+ let rustls_config = rustls::client::ClientConfig::builder()
+ .with_safe_defaults()
+ .with_custom_certificate_verifier(verifier)
+ .with_no_client_auth();
+ //TODO: Disable verify server certificate
+ let tls_config = ClientTlsConfig::new();
+ channel = channel.tls_config(tls_config)?;
+ }
+ let channel = channel.connect().await?;
+ let stub = MessagingServiceClient::new(channel);
+ Ok(RpcClient {
+ stub,
+ remote_address,
+ })
+ }
+
+ pub async fn query_route(
+ &mut self,
+ request: QueryRouteRequest,
+ ) -> Result<Response<QueryRouteResponse>, Box<dyn std::error::Error>> {
+ let req = Request::new(request);
+ Ok(self.stub.query_route(req).await?)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_connect() {
+ let target = "http://127.0.0.1:5001";
+ let mut rpc_client = RpcClient::new(target)
+ .await
+ .expect("Should be able to connect");
+ let topic = Resource {
+ resource_namespace: String::from("arn"),
+ name: String::from("TestTopic"),
+ };
+ let request = QueryRouteRequest {
+ topic: Some(topic),
+ endpoints: None,
+ };
+
+ let reply = rpc_client
+ .query_route(request)
+ .await
+ .expect("Failed to query route");
+ }
+}
diff --git a/src/service.rs b/src/service.rs
new file mode 100644
index 0000000..08c2c8f
--- /dev/null
+++ b/src/service.rs
@@ -0,0 +1,156 @@
+use crate::pb::{
+ messaging_service_server::MessagingService,
messaging_service_server::MessagingServiceServer,
+ AckMessageRequest, AckMessageResponse, ChangeInvisibleDurationRequest,
+ ChangeInvisibleDurationResponse, EndTransactionRequest,
EndTransactionResponse,
+ ForwardMessageToDeadLetterQueueRequest,
ForwardMessageToDeadLetterQueueResponse,
+ HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
+ NotifyClientTerminationResponse, PullMessageRequest, PullMessageResponse,
+ QueryAssignmentRequest, QueryAssignmentResponse, QueryOffsetRequest,
QueryOffsetResponse,
+ QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest,
ReceiveMessageResponse,
+ SendMessageRequest, SendMessageResponse, TelemetryCommand,
+};
+use futures::Stream;
+use tonic::{transport::Server, Request, Response, Status, Streaming};
+
+#[derive(Default)]
+pub struct ServerService {}
+
+type ResponseStream =
+ std::pin::Pin<Box<dyn Stream<Item = Result<TelemetryCommand, Status>> +
Send>>;
+
+#[tonic::async_trait]
+impl MessagingService for ServerService {
+ type TelemetryStream = ResponseStream;
+ async fn query_route(
+ &self,
+ request: Request<QueryRouteRequest>,
+ ) -> Result<Response<QueryRouteResponse>, Status> {
+ println!("{:?}", request);
+ let reply = QueryRouteResponse {
+ status: None,
+ message_queues: vec![],
+ };
+ Ok(Response::new(reply))
+ }
+
+ async fn heartbeat(
+ &self,
+ request: Request<HeartbeatRequest>,
+ ) -> Result<Response<HeartbeatResponse>, Status> {
+ let reply = HeartbeatResponse { status: None };
+ Ok(Response::new(reply))
+ }
+
+ async fn send_message(
+ &self,
+ request: Request<SendMessageRequest>,
+ ) -> Result<Response<SendMessageResponse>, Status> {
+ let reply = SendMessageResponse {
+ status: None,
+ receipts: vec![],
+ };
+ Ok(Response::new(reply))
+ }
+
+ async fn query_assignment(
+ &self,
+ request: Request<QueryAssignmentRequest>,
+ ) -> Result<Response<QueryAssignmentResponse>, Status> {
+ let reply = QueryAssignmentResponse {
+ status: None,
+ assignments: vec![],
+ };
+ Ok(Response::new(reply))
+ }
+
+ async fn receive_message(
+ &self,
+ request: Request<ReceiveMessageRequest>,
+ ) -> Result<Response<ReceiveMessageResponse>, Status> {
+ let reply = ReceiveMessageResponse {
+ status: None,
+ delivery_timestamp: None,
+ invisible_duration: None,
+ messages: vec![],
+ };
+ Ok(Response::new(reply))
+ }
+
+ async fn ack_message(
+ &self,
+ request: Request<AckMessageRequest>,
+ ) -> Result<Response<AckMessageResponse>, Status> {
+ let reply = AckMessageResponse { status: None };
+ Ok(Response::new(reply))
+ }
+
+ async fn forward_message_to_dead_letter_queue(
+ &self,
+ request: Request<ForwardMessageToDeadLetterQueueRequest>,
+ ) -> Result<Response<ForwardMessageToDeadLetterQueueResponse>, Status> {
+ let reply = ForwardMessageToDeadLetterQueueResponse { status: None };
+ Ok(Response::new(reply))
+ }
+
+ /// Commits or rollback one transactional message.
+ async fn end_transaction(
+ &self,
+ request: Request<EndTransactionRequest>,
+ ) -> Result<Response<EndTransactionResponse>, Status> {
+ let reply = EndTransactionResponse { status: None };
+ Ok(Response::new(reply))
+ }
+
+ async fn query_offset(
+ &self,
+ request: Request<QueryOffsetRequest>,
+ ) -> Result<Response<QueryOffsetResponse>, Status> {
+ let reply = QueryOffsetResponse {
+ status: None,
+ offset: 0,
+ };
+ Ok(Response::new(reply))
+ }
+
+ async fn pull_message(
+ &self,
+ request: Request<PullMessageRequest>,
+ ) -> Result<Response<PullMessageResponse>, Status> {
+ let reply = PullMessageResponse {
+ status: None,
+ min_offset: 0,
+ next_offset: 0,
+ max_offset: 10,
+ messages: vec![],
+ };
+ Ok(Response::new(reply))
+ }
+
+ ///Server streaming response type for the Telemetry method.
+ async fn telemetry(
+ &self,
+ request: Request<Streaming<TelemetryCommand>>,
+ ) -> Result<Response<Self::TelemetryStream>, Status> {
+ Err(Status::aborted("NotImplemented"))
+ }
+
+ /// Notify the server that the client is terminated.
+ async fn notify_client_termination(
+ &self,
+ request: Request<NotifyClientTerminationRequest>,
+ ) -> Result<Response<NotifyClientTerminationResponse>, Status> {
+ let reply = NotifyClientTerminationResponse { status: None };
+ Ok(Response::new(reply))
+ }
+
+ async fn change_invisible_duration(
+ &self,
+ request: Request<ChangeInvisibleDurationRequest>,
+ ) -> Result<Response<ChangeInvisibleDurationResponse>, Status> {
+ let reply = ChangeInvisibleDurationResponse {
+ status: None,
+ receipt_handle: String::from("receipt-handle"),
+ };
+ Ok(Response::new(reply))
+ }
+}