This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-rust.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ce8924  Use protocol v2
0ce8924 is described below

commit 0ce8924fa2fcb0f15c424d400e83f9a4dc46a5d4
Author: Li Zhanhui <[email protected]>
AuthorDate: Sat Apr 2 08:07:03 2022 +0000

    Use protocol v2
---
 Cargo.lock                                     | 394 ++++++++++++++++++--
 Cargo.toml                                     |  20 +-
 build.rs                                       |   4 +-
 proto/apache/rocketmq/v1/definition.proto      | 343 ------------------
 proto/apache/rocketmq/{v1 => v2}/admin.proto   |  10 +-
 proto/apache/rocketmq/v2/definition.proto      | 475 +++++++++++++++++++++++++
 proto/apache/rocketmq/{v1 => v2}/service.proto | 363 ++++++-------------
 proto/google/rpc/error_details.proto           | 247 -------------
 proto/google/rpc/status.proto                  |  47 ---
 src/bin/server.rs                              |  15 +
 src/lib.rs                                     |  29 +-
 src/pb.rs                                      |   1 +
 src/rocketmq.rs                                |  90 +++++
 src/service.rs                                 | 156 ++++++++
 14 files changed, 1235 insertions(+), 959 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a35947b..ff88454 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3,13 +3,10 @@
 version = 3
 
 [[package]]
-name = "aho-corasick"
-version = "0.7.18"
+name = "adler"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
-dependencies = [
- "memchr",
-]
+checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
 
 [[package]]
 name = "anyhow"
@@ -56,6 +53,49 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
 
 [[package]]
+name = "axum"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5611d4977882c5af1c0f7a34d51b5d87f784f86912bb543986b014ea4995ef93"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "bitflags",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite",
+ "serde",
+ "sync_wrapper",
+ "tokio",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "95cd109b3e93c9541dcce5b0219dcf89169dcc58c1bebed65082808324258afb"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+]
+
+[[package]]
 name = "base64"
 version = "0.13.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -68,18 +108,48 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
 [[package]]
+name = "bumpalo"
+version = "3.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899"
+
+[[package]]
 name = "bytes"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
 
 [[package]]
+name = "cc"
+version = "1.0.73"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
+
+[[package]]
 name = "cfg-if"
 version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
+name = "cmake"
+version = "0.1.48"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a"
+dependencies = [
+ "cc",
+]
+
+[[package]]
+name = "crc32fast"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
 name = "either"
 version = "1.6.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -101,18 +171,45 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
 
 [[package]]
+name = "flate2"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
+dependencies = [
+ "cfg-if",
+ "crc32fast",
+ "libc",
+ "miniz_oxide",
+]
+
+[[package]]
 name = "fnv"
 version = "1.0.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 
 [[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
 name = "futures-channel"
 version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
 dependencies = [
  "futures-core",
+ "futures-sink",
 ]
 
 [[package]]
@@ -122,6 +219,12 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
 
 [[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
 name = "futures-sink"
 version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -140,6 +243,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
 dependencies = [
  "futures-core",
+ "futures-sink",
  "futures-task",
  "pin-project-lite",
  "pin-utils",
@@ -183,12 +287,9 @@ checksum = 
"ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
 
 [[package]]
 name = "heck"
-version = "0.3.3"
+version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
-dependencies = [
- "unicode-segmentation",
-]
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
 
 [[package]]
 name = "hermit-abi"
@@ -222,6 +323,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "http-range-header"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
+
+[[package]]
 name = "httparse"
 version = "1.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -304,6 +411,15 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
 
 [[package]]
+name = "js-sys"
+version = "0.3.56"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04"
+dependencies = [
+ "wasm-bindgen",
+]
+
+[[package]]
 name = "lazy_static"
 version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -334,12 +450,34 @@ dependencies = [
 ]
 
 [[package]]
+name = "matchit"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
+
+[[package]]
 name = "memchr"
 version = "2.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
 
 [[package]]
+name = "mime"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
+
+[[package]]
+name = "miniz_oxide"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
+dependencies = [
+ "adler",
+ "autocfg",
+]
+
+[[package]]
 name = "mio"
 version = "0.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -470,6 +608,16 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
 
 [[package]]
+name = "prettyplease"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3b83ec2d0af5c5c556257ff52c9f98934e243b9fd39604bfb2a9b75ec2e97f18"
+dependencies = [
+ "proc-macro2",
+ "syn",
+]
+
+[[package]]
 name = "proc-macro2"
 version = "1.0.36"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -480,9 +628,9 @@ dependencies = [
 
 [[package]]
 name = "prost"
-version = "0.9.0"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
+checksum = "1bd5316aa8f5c82add416dfbc25116b84b748a21153f512917e8143640a71bbd"
 dependencies = [
  "bytes",
  "prost-derive",
@@ -490,11 +638,13 @@ dependencies = [
 
 [[package]]
 name = "prost-build"
-version = "0.9.0"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
+checksum = "328f9f29b82409216decb172d81e936415d21245befa79cd34c3f29d87d1c50b"
 dependencies = [
  "bytes",
+ "cfg-if",
+ "cmake",
  "heck",
  "itertools",
  "lazy_static",
@@ -510,9 +660,9 @@ dependencies = [
 
 [[package]]
 name = "prost-derive"
-version = "0.9.0"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
+checksum = "df35198f0777b75e9ff669737c6da5136b59dba33cf5a010a6d1cc4d56defc6f"
 dependencies = [
  "anyhow",
  "itertools",
@@ -523,9 +673,9 @@ dependencies = [
 
 [[package]]
 name = "prost-types"
-version = "0.9.0"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
+checksum = "926681c118ae6e512a3ccefd4abbe5521a14f4cc1e207356d4d00c0b7f2006fd"
 dependencies = [
  "bytes",
  "prost",
@@ -581,12 +731,10 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.5.4"
+version = "1.5.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
+checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
 dependencies = [
- "aho-corasick",
- "memchr",
  "regex-syntax",
 ]
 
@@ -606,23 +754,78 @@ dependencies = [
 ]
 
 [[package]]
-name = "rocketmq-client-rust"
+name = "ring"
+version = "0.16.20"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
+dependencies = [
+ "cc",
+ "libc",
+ "once_cell",
+ "spin",
+ "untrusted",
+ "web-sys",
+ "winapi",
+]
+
+[[package]]
+name = "rocketmq"
 version = "0.1.0"
 dependencies = [
+ "futures",
  "prost",
  "prost-types",
+ "rustls",
  "tokio",
+ "tokio-rustls",
  "tonic",
  "tonic-build",
 ]
 
 [[package]]
+name = "rustls"
+version = "0.20.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921"
+dependencies = [
+ "log",
+ "ring",
+ "sct",
+ "webpki",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
+dependencies = [
+ "base64",
+]
+
+[[package]]
 name = "scopeguard"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
 
 [[package]]
+name = "sct"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
+dependencies = [
+ "ring",
+ "untrusted",
+]
+
+[[package]]
+name = "serde"
+version = "1.0.136"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
+
+[[package]]
 name = "signal-hook-registry"
 version = "1.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -654,6 +857,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "spin"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
+
+[[package]]
 name = "syn"
 version = "1.0.86"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -665,6 +874,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "sync_wrapper"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
+
+[[package]]
 name = "tempfile"
 version = "3.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -720,6 +935,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "tokio-rustls"
+version = "0.23.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e"
+dependencies = [
+ "rustls",
+ "tokio",
+ "webpki",
+]
+
+[[package]]
 name = "tokio-stream"
 version = "0.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -760,14 +986,16 @@ dependencies = [
 
 [[package]]
 name = "tonic"
-version = "0.6.2"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
+checksum = "9a1a361140b1af3f548e0a5105126b3fc737542f6cd4947b66419c80be07db22"
 dependencies = [
  "async-stream",
  "async-trait",
+ "axum",
  "base64",
  "bytes",
+ "flate2",
  "futures-core",
  "futures-util",
  "h2",
@@ -779,9 +1007,11 @@ dependencies = [
  "pin-project",
  "prost",
  "prost-derive",
+ "rustls-pemfile",
  "tokio",
+ "tokio-rustls",
  "tokio-stream",
- "tokio-util 0.6.9",
+ "tokio-util 0.7.0",
  "tower",
  "tower-layer",
  "tower-service",
@@ -791,10 +1021,11 @@ dependencies = [
 
 [[package]]
 name = "tonic-build"
-version = "0.6.2"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
+checksum = "4d17087af5c80e5d5fc8ba9878e60258065a0a757e35efe7a05b7904bece1943"
 dependencies = [
+ "prettyplease",
  "proc-macro2",
  "prost-build",
  "quote",
@@ -822,6 +1053,25 @@ dependencies = [
 ]
 
 [[package]]
+name = "tower-http"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8"
+dependencies = [
+ "bitflags",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-range-header",
+ "pin-project-lite",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
 name = "tower-layer"
 version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -883,18 +1133,18 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
 
 [[package]]
-name = "unicode-segmentation"
-version = "1.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
-
-[[package]]
 name = "unicode-xid"
 version = "0.2.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
 
 [[package]]
+name = "untrusted"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+
+[[package]]
 name = "want"
 version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
@@ -911,6 +1161,80 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
 
 [[package]]
+name = "wasm-bindgen"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06"
+dependencies = [
+ "cfg-if",
+ "wasm-bindgen-macro",
+]
+
+[[package]]
+name = "wasm-bindgen-backend"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca"
+dependencies = [
+ "bumpalo",
+ "lazy_static",
+ "log",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-macro"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01"
+dependencies = [
+ "quote",
+ "wasm-bindgen-macro-support",
+]
+
+[[package]]
+name = "wasm-bindgen-macro-support"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-backend",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-shared"
+version = "0.2.79"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2"
+
+[[package]]
+name = "web-sys"
+version = "0.3.56"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "webpki"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
+dependencies = [
+ "ring",
+ "untrusted",
+]
+
+[[package]]
 name = "which"
 version = "4.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
diff --git a/Cargo.toml b/Cargo.toml
index 52064d8..42483bd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,15 +1,23 @@
 [package]
-name = "rocketmq-client-rust"
+name = "rocketmq"
 version = "0.1.0"
 edition = "2021"
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-tonic = "0.6"
-prost = "0.9"
-prost-types = "0.9"
-tokio = { version = "1.0", features = ["full"]}
+futures = { version = "0.3", default-features = false, features = ["alloc"] }
+tonic = { version = "0.7", features = ["default", "tls", "compression"] }
+prost = "0.10"
+prost-types = "0.10"
+tokio = { version = "1", features = ["full"]}
+tokio-rustls = "0.23"
+rustls = {version = "0.20", features = ["default", "dangerous_configuration"]}
 
 [build-dependencies]
-tonic-build = "0.6"
\ No newline at end of file
+tonic-build = {version = "0.7", features = ["default", "compression"]}
+
+
+[[bin]]
+name = "server"
+path = "src/bin/server.rs"
\ No newline at end of file
diff --git a/build.rs b/build.rs
index d210350..a3f0af4 100644
--- a/build.rs
+++ b/build.rs
@@ -1,8 +1,8 @@
 fn main() {
-    let idl_files = &["proto/apache/rocketmq/v1/service.proto"];
+    let idl_files = &["proto/apache/rocketmq/v2/service.proto"];
     tonic_build::configure()
         .build_client(true)
-        .build_server(false)
+        .build_server(true)
         .compile(idl_files, &["proto"])
         .unwrap_or_else(|e| panic!("protoc failed: {}", e));
 }
diff --git a/proto/apache/rocketmq/v1/definition.proto 
b/proto/apache/rocketmq/v1/definition.proto
deleted file mode 100644
index 898695b..0000000
--- a/proto/apache/rocketmq/v1/definition.proto
+++ /dev/null
@@ -1,343 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "google/protobuf/timestamp.proto";
-import "google/protobuf/duration.proto";
-
-package apache.rocketmq.v1;
-
-enum Permission {
-  NONE = 0;
-  READ = 1;
-  WRITE = 2;
-  READ_WRITE = 3;
-
-  reserved 4 to 64;
-}
-
-enum FilterType {
-  TAG = 0;
-  SQL = 1;
-
-  reserved 2 to 64;
-}
-
-message FilterExpression {
-  FilterType type = 1;
-  string expression = 2;
-
-  reserved 3 to 64;
-}
-
-// Dead lettering is done on a best effort basis. The same message might be
-// dead lettered multiple times.
-//
-// If validation on any of the fields fails at subscription creation/update,
-// the create/update subscription request will fail.
-message DeadLetterPolicy {
-  // The maximum number of delivery attempts for any message.
-  //
-  // This field will be honored on a best effort basis.
-  //
-  // If this parameter is 0, a default value of 16 is used.
-  int32 max_delivery_attempts = 1;
-
-  reserved 2 to 64;
-}
-
-message Resource {
-  string resource_namespace = 1;
-
-  // Resource name identifier, which remains unique within the abstract 
resource
-  // namespace.
-  string name = 2;
-
-  reserved 3 to 64;
-}
-
-enum ConsumeModel {
-  CLUSTERING = 0;
-  BROADCASTING = 1;
-
-  reserved 2 to 64;
-}
-
-message ProducerData {
-  Resource group = 1;
-
-  reserved 2 to 64;
-}
-
-enum ConsumePolicy {
-  RESUME = 0;
-  PLAYBACK = 1;
-  DISCARD = 2;
-  TARGET_TIMESTAMP = 3;
-
-  reserved 4 to 64;
-}
-
-enum ConsumeMessageType {
-  ACTIVE = 0;
-  PASSIVE = 1;
-
-  reserved 2 to 64;
-}
-
-message ConsumerData {
-  Resource group = 1;
-
-  repeated SubscriptionEntry subscriptions = 2;
-
-  ConsumeModel consume_model = 3;
-
-  ConsumePolicy consume_policy = 4;
-
-  DeadLetterPolicy dead_letter_policy = 5;
-
-  ConsumeMessageType consume_type = 6;
-
-  reserved 7 to 64;
-}
-
-message SubscriptionEntry {
-  Resource topic = 1;
-  FilterExpression expression = 2;
-
-  reserved 3 to 64;
-}
-
-enum AddressScheme {
-  IPv4 = 0;
-  IPv6 = 1;
-  DOMAIN_NAME = 2;
-
-  reserved 3 to 64;
-}
-
-message Address {
-  string host = 1;
-  int32 port = 2;
-
-  reserved 3 to 64;
-}
-
-message Endpoints {
-  AddressScheme scheme = 1;
-  repeated Address addresses = 2;
-
-  reserved 3 to 64;
-}
-
-message Broker {
-  // Name of the broker
-  string name = 1;
-
-  // Broker index. Canonically, index = 0 implies that the broker is playing
-  // leader role while brokers with index > 0 play follower role.
-  int32 id = 2;
-
-  // Address of the broker, complying with the following scheme
-  // 1. dns:[//authority/]host[:port]
-  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
-  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
-  Endpoints endpoints = 3;
-
-  reserved 4 to 64;
-}
-
-message Partition {
-  Resource topic = 1;
-  int32 id = 2;
-  Permission permission = 3;
-  Broker broker = 4;
-
-  reserved 5 to 64;
-}
-
-enum MessageType {
-  NORMAL = 0;
-
-  // Sequenced message
-  FIFO = 1;
-
-  // Messages that are delivered after the specified duration.
-  DELAY = 2;
-
-  // Messages that are transactional. Only committed messages are delivered to
-  // subscribers.
-  TRANSACTION = 3;
-
-  reserved 4 to 64;
-}
-
-enum DigestType {
-  // CRC algorithm achieves goal of detecting random data error with lowest
-  // computation overhead.
-  CRC32 = 0;
-
-  // MD5 algorithm achieves good balance between collision rate and computation
-  // overhead.
-  MD5 = 1;
-
-  // SHA-family has substantially fewer collision with fair amount of
-  // computation.
-  SHA1 = 2;
-
-  reserved 3 to 64;
-}
-
-// When publishing messages to or subscribing messages from brokers, clients
-// shall include or validate digests of message body to ensure data integrity.
-//
-// For message publishment, when an invalid digest were detected, brokers need
-// respond client with BAD_REQUEST.
-//
-// For messags subscription, when an invalid digest were detected, consumers
-// need to handle this case according to message type:
-// 1) Standard messages should be negatively acknowledged instantly, causing
-// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
-// previously acquired messages batch;
-//
-// Message consumption model also affects how invalid digest are handled. When
-// messages are consumed in broadcasting way,
-// TODO: define semantics of invalid-digest-when-broadcasting.
-message Digest {
-  DigestType type = 1;
-  string checksum = 2;
-
-  reserved 3 to 64;
-}
-
-enum Encoding {
-  IDENTITY = 0;
-  GZIP = 1;
-
-  reserved 2 to 64;
-}
-
-message SystemAttribute {
-  // Tag
-  string tag = 1;
-
-  // Message keys
-  repeated string keys = 2;
-
-  // Message identifier, client-side generated, remains unique.
-  // if message_id is empty, the send message request will be aborted with
-  // status `INVALID_ARGUMENT`
-  string message_id = 3;
-
-  // Message body digest
-  Digest body_digest = 4;
-
-  // Message body encoding. Candidate options are identity, gzip, snappy etc.
-  Encoding body_encoding = 5;
-
-  // Message type, normal, FIFO or transactional.
-  MessageType message_type = 6;
-
-  // Message born time-point.
-  google.protobuf.Timestamp born_timestamp = 7;
-
-  // Message born host. Valid options are IPv4, IPv6 or client host domain 
name.
-  string born_host = 8;
-
-  // Time-point at which the message is stored in the broker.
-  google.protobuf.Timestamp store_timestamp = 9;
-
-  // The broker that stores this message. It may be name, IP or arbitrary
-  // identifier that uniquely identify the broker.
-  string store_host = 10;
-
-  oneof timed_delivery {
-    // Time-point at which broker delivers to clients.
-    google.protobuf.Timestamp delivery_timestamp = 11;
-
-    // Level-based delay strategy.
-    int32 delay_level = 12;
-  }
-
-  // If a message is acquired by way of POP, this field holds the receipt.
-  // Clients use the receipt to acknowledge or negatively acknowledge the
-  // message.
-  string receipt_handle = 13;
-
-  // Partition identifier in which a message is physically stored.
-  int32 partition_id = 14;
-
-  // Partition offset at which a message is stored.
-  int64 partition_offset = 15;
-
-  // Period of time servers would remain invisible once a message is acquired.
-  google.protobuf.Duration invisible_period = 16;
-
-  // Business code may failed to process messages for the moment. Hence, 
clients
-  // may request servers to deliver them again using certain back-off strategy,
-  // the attempt is 1 not 0 if message is delivered first time.
-  int32 delivery_attempt = 17;
-
-  // Message producer load-balance group if applicable.
-  Resource producer_group = 18;
-
-  string message_group = 19;
-
-  // Trace context.
-  string trace_context = 20;
-
-  // Delay time of first recover orphaned transaction request from server.
-  google.protobuf.Duration orphaned_transaction_recovery_period = 21;
-
-  reserved 22 to 64;
-}
-
-message Message {
-
-  Resource topic = 1;
-
-  // User defined key-value pairs.
-  // If user_attribute contains the reserved keys by RocketMQ,
-  // the send message request will be aborted with status `INVALID_ARGUMENT`.
-  // See below links for the reserved keys
-  // 
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
-  map<string, string> user_attribute = 2;
-
-  SystemAttribute system_attribute = 3;
-
-  bytes body = 4;
-
-  reserved 5 to 64;
-}
-
-message Assignment {
-  Partition Partition = 1;
-
-  reserved 2 to 64;
-}
-
-enum QueryOffsetPolicy {
-  // Use this option if client wishes to playback all existing messages.
-  BEGINNING = 0;
-
-  // Use this option if client wishes to skip all existing messages.
-  END = 1;
-
-  // Use this option if time-based seek is targeted.
-  TIME_POINT = 2;
-
-  reserved 3 to 64;
-}
\ No newline at end of file
diff --git a/proto/apache/rocketmq/v1/admin.proto 
b/proto/apache/rocketmq/v2/admin.proto
similarity index 78%
rename from proto/apache/rocketmq/v1/admin.proto
rename to proto/apache/rocketmq/v2/admin.proto
index b452e97..7dbb702 100644
--- a/proto/apache/rocketmq/v1/admin.proto
+++ b/proto/apache/rocketmq/v2/admin.proto
@@ -15,7 +15,15 @@
 
 syntax = "proto3";
 
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
 
 message ChangeLogLevelRequest {
   enum Level {
diff --git a/proto/apache/rocketmq/v2/definition.proto 
b/proto/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..cd19a27
--- /dev/null
+++ b/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+  TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+  COMMIT = 1;
+  ROLLBACK = 2;
+}
+
+enum TransactionSource {
+  SOURCE_UNSPECIFIED = 0;
+  SOURCE_CLIENT = 1;
+  SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+  PERMISSION_UNSPECIFIED = 0;
+  NONE = 1;
+  READ = 2;
+  WRITE = 3;
+  READ_WRITE = 4;
+}
+
+enum FilterType {
+  FILTER_TYPE_UNSPECIFIED = 0;
+  TAG = 1;
+  SQL = 2;
+}
+
+message FilterExpression {
+  FilterType type = 1;
+  string expression = 2;
+}
+
+// Dead lettering is done on a best effort basis. The same message might be
+// dead lettered multiple times.
+//
+// If validation on any of the fields fails at subscription creation/update,
+// the create/update subscription request will fail.
+message DeadLetterPolicy {
+  // The maximum number of delivery attempts for any message.
+  //
+  // This field will be honored on a best effort basis.
+  //
+  // If this parameter is 0, a default value of 16 is used.
+  int32 max_delivery_attempts = 1;
+}
+
+message RetryPolicy {
+  int32 max_attempts = 1;
+  float initial_backoff = 2;
+  float max_backoff = 3;
+  float backoff_multiplier = 4;
+}
+
+message Resource {
+  string resource_namespace = 1;
+
+  // Resource name identifier, which remains unique within the abstract 
resource
+  // namespace.
+  string name = 2;
+}
+
+enum ConsumeMessageType {
+  CONSUME_MESSAGE_TYPE_UNSPECIFIED = 0;
+  ACTIVE = 1;
+  PASSIVE = 2;
+}
+
+message Trace {
+  bool on = 1;
+  Endpoints service_access_point = 2;
+}
+
+enum AuthenticationMethod {
+  AUTHENTICATION_METHOD_UNSPECIFIED = 0;
+  SASL = 1;
+  MUTUAL_TLS = 2;
+  HTTP_BASIC_AUTH = 3;
+}
+
+message Authentication {
+  AuthenticationMethod method = 1;
+  string identity = 2;
+}
+
+// Transport
+message Timeout {
+  google.protobuf.Duration connect = 1;
+
+  google.protobuf.Duration request = 2;
+
+  // Long polling duration
+  google.protobuf.Duration polling = 3;
+}
+
+message Publish {
+  // Publisher normally registers topics in interest, such that
+  // pre-conditions may be examined and validated.
+  repeated Resource topics = 1;
+
+  // If a transactional message stay unresolved for more than
+  // `transaction_orphan_threshold`, it would be regarded as an
+  // orphan. Servers that manages orphan messages would pick up
+  // a capable publisher to resolve
+  google.protobuf.Duration transaction_orphan_threshold = 2;
+
+  // If publishing message experiences RPC failure, `retry_policy` describes
+  // backoff policy before retries are made.
+  RetryPolicy retry_policy = 3;
+
+  // If message body size exceeds `compress_threshold`, it would be desirable 
to
+  // compress it to relieve network overhead.
+  int32 compress_threshold = 4;
+
+  // Max message size in bytes permitted by server.
+  int32 max_message_bytes = 5;
+}
+
+message CacheLimits {
+  int32 count = 1;
+  int64 bytes = 2;
+}
+
+message Subscription {
+  Resource group = 1;
+
+  repeated SubscriptionEntry subscriptions = 2;
+
+  DeadLetterPolicy dead_letter_policy = 3;
+
+  ConsumeMessageType consume_type = 4;
+
+  bool fifo = 5;
+
+  // For RPC
+  RetryPolicy retry_policy = 6;
+
+  // For PushConsumer
+  RetryPolicy consume_backoff_policy = 7;
+
+  int32 max_receive_batch_size = 8;
+
+  // After messages are received from servers, consumers normally split them
+  // into multiple mini-batches. Each mini-batch is assigned to a dedicated
+  // task, which will be submitted to thread-pool to run concurrently.
+  int32 messages_per_task = 9;
+
+  CacheLimits cache_limits = 10;
+
+  int32 consume_thread_count = 11;
+
+  // Up to server
+  google.protobuf.Duration max_invisible_duration = 12;
+}
+
+message SubscriptionEntry {
+  Resource topic = 1;
+  FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+  ADDRESS_SCHEME_UNSPECIFIED = 0;
+  IPv4 = 1;
+  IPv6 = 2;
+  DOMAIN_NAME = 3;
+}
+
+message Address {
+  string host = 1;
+  int32 port = 2;
+}
+
+message Endpoints {
+  AddressScheme scheme = 1;
+  repeated Address addresses = 2;
+}
+
+message Broker {
+  // Name of the broker
+  string name = 1;
+
+  // Broker index. Canonically, index = 0 implies that the broker is playing
+  // leader role while brokers with index > 0 play follower role.
+  int32 id = 2;
+
+  // Address of the broker, complying with the following scheme
+  // 1. dns:[//authority/]host[:port]
+  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+  Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+  Resource topic = 1;
+  int32 id = 2;
+  Permission permission = 3;
+  Broker broker = 4;
+  repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+  MESSAGE_TYPE_UNSPECIFIED = 0;
+
+  NORMAL = 1;
+
+  // Sequenced message
+  FIFO = 2;
+
+  // Messages that are delivered after the specified duration.
+  DELAY = 3;
+
+  // Messages that are transactional. Only committed messages are delivered to
+  // subscribers.
+  TRANSACTION = 4;
+}
+
+enum DigestType {
+  DIGEST_TYPE_UNSPECIFIED = 0;
+
+  // CRC algorithm achieves goal of detecting random data error with lowest
+  // computation overhead.
+  CRC32 = 1;
+
+  // MD5 algorithm achieves good balance between collision rate and computation
+  // overhead.
+  MD5 = 2;
+
+  // SHA-family has substantially fewer collision with fair amount of
+  // computation.
+  SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishment, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messags subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+  DigestType type = 1;
+  string checksum = 2;
+}
+
+enum Encoding {
+  ENCODING_UNSPECIFIED = 0;
+
+  IDENTITY = 1;
+
+  GZIP = 2;
+}
+
+message SystemProperties {
+  // Tag
+  string tag = 1;
+
+  // Message keys
+  repeated string keys = 2;
+
+  // Message identifier, client-side generated, remains unique.
+  // if message_id is empty, the send message request will be aborted with
+  // status `INVALID_ARGUMENT`
+  string message_id = 3;
+
+  // Message body digest
+  Digest body_digest = 4;
+
+  // Message body encoding. Candidate options are identity, gzip, snappy etc.
+  Encoding body_encoding = 5;
+
+  // Message type, normal, FIFO or transactional.
+  MessageType message_type = 6;
+
+  // Message born time-point.
+  google.protobuf.Timestamp born_timestamp = 7;
+
+  // Message born host. Valid options are IPv4, IPv6 or client host domain 
name.
+  string born_host = 8;
+
+  // Time-point at which the message is stored in the broker.
+  google.protobuf.Timestamp store_timestamp = 9;
+
+  // The broker that stores this message. It may be broker name, IP or 
arbitrary
+  // identifier that uniquely identify the server.
+  string store_host = 10;
+
+  // Time-point at which broker delivers to clients.
+  google.protobuf.Timestamp delivery_timestamp = 11;
+
+  // If a message is acquired by way of POP, this field holds the receipt.
+  // Clients use the receipt to acknowledge or negatively acknowledge the
+  // message.
+  string receipt_handle = 12;
+
+  // Message queue identifier in which a message is physically stored.
+  int32 queue_id = 13;
+
+  // Message-queue offset at which a message is stored.
+  int64 queue_offset = 14;
+
+  // Period of time servers would remain invisible once a message is acquired.
+  google.protobuf.Duration invisible_duration = 15;
+
+  // Business code may failed to process messages for the moment. Hence, 
clients
+  // may request servers to deliver them again using certain back-off strategy,
+  // the attempt is 1 not 0 if message is delivered first time.
+  int32 delivery_attempt = 16;
+
+  string message_group = 17;
+
+  // Trace context.
+  string trace_context = 18;
+
+  // Delay time of first recover orphaned transaction request from server.
+  google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+
+  Resource topic = 1;
+
+  // User defined key-value pairs.
+  // If user_properties contain the reserved keys by RocketMQ,
+  // the send message request will be aborted with status `INVALID_ARGUMENT`.
+  // See below links for the reserved keys
+  // 
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+  map<string, string> user_properties = 2;
+
+  SystemProperties system_properties = 3;
+
+  bytes body = 4;
+}
+
+message Assignment { MessageQueue message_queue = 1; }
+
+enum QueryOffsetPolicy {
+  QUERY_OFFSET_POLICY_UNSPECIFIED = 0;
+  
+  // Use this option if client wishes to playback all existing messages.
+  BEGINNING = 1;
+
+  // Use this option if client wishes to skip all existing messages.
+  END = 2;
+
+  // Use this option if time-based seek is targeted.
+  TIME_POINT = 3;
+}
+
+message SendReceipt {
+  string message_id = 1;
+  string transaction_id = 2;
+}
+
+enum Code {
+  // Success.
+  OK = 0;
+  // Format of access point is illegal.
+  ILLEGAL_ACCESS_POINT = 1;
+  // Format of topic is illegal.
+  ILLEGAL_TOPIC = 2;
+  // Format of consumer group is illegal.
+  ILLEGAL_CONSUMER_GROUP = 3;
+  // Format of message tag is illegal.
+  ILLEGAL_MESSAGE_TAG = 4;
+  // Format of message key is illegal.
+  ILLEGAL_MESSAGE_KEY = 5;
+  // Size of message keys exceeds the threshold.
+  MESSAGE_KEYS_TOO_LARGE = 6;
+  // Format of message group is illegal.
+  ILLEGAL_MESSAGE_GROUP = 7;
+  // Format of message property key is illegal.
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 9;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 10;
+
+  // User does not have the permission to operate.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
+  FORBIDDEN = 403;
+
+  // Code indicates that the client request has not been completed
+  // because it lacks valid authentication credentials for the
+  // requested resource.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
+  UNAUTHORIZED = 401;
+
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 13;
+
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 14;
+
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_MESSAGE_FORBIDDEN = 15;
+
+  // Failed to consume message.
+  FAILED_TO_CONSUME_MESSAGE = 16;
+
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 17;
+
+  // Too many requests are made in short period of duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 18;
+
+  // Expired receip-handle is used when trying to acknowledge or change
+  // invisible duration of a message
+  RECEIPT_HANDLE_EXPIRED = 19;
+
+  // Message property is not match the message type.
+  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
+
+  // Code indicates that the server encountered an unexpected condition
+  // that prevented it from fulfilling the request.
+  // This error response is a generic "catch-all" response.
+  // Usually, this indicates the server cannot find a better alternative
+  // error code to response. Sometimes, server administrators log error
+  // responses like the 500 status code with more details about the request
+  // to prevent the error from happening again in the future.
+  //
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+  INTERNAL_SERVER_ERROR = 500;
+
+  // Code means that the server or client does not support the functionality
+  // required to fulfill the request.
+  NOT_IMPLEMENTED = 501;
+
+  // Code indicates that the server, while acting as a gateway or proxy,
+  // did not get a response in time from the upstream server that
+  // it needed in order to complete the request.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+  GATEWAY_TIMEOUT = 504;
+}
+
+message Status {
+  Code code = 1;
+  string message = 2;
+}
\ No newline at end of file
diff --git a/proto/apache/rocketmq/v1/service.proto 
b/proto/apache/rocketmq/v2/service.proto
similarity index 55%
rename from proto/apache/rocketmq/v1/service.proto
rename to proto/apache/rocketmq/v2/service.proto
index 77fa63f..f01c55d 100644
--- a/proto/apache/rocketmq/v1/service.proto
+++ b/proto/apache/rocketmq/v2/service.proto
@@ -17,35 +17,29 @@ syntax = "proto3";
 
 import "google/protobuf/duration.proto";
 import "google/protobuf/timestamp.proto";
-import "google/rpc/error_details.proto";
-import "google/rpc/status.proto";
 
-import "apache/rocketmq/v1/definition.proto";
+import "apache/rocketmq/v2/definition.proto";
 
-package apache.rocketmq.v1;
+package apache.rocketmq.v2;
 
-message ResponseCommon {
-  google.rpc.Status status = 1;
-  google.rpc.RequestInfo request_info = 2;
-  google.rpc.Help help = 3;
-  google.rpc.RetryInfo retry_info = 4;
-  google.rpc.DebugInfo debug_info = 5;
-  google.rpc.ErrorInfo error_info = 6;
-
-  reserved 7 to 64;
-}
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
 
 // Topics are destination of messages to publish to or subscribe from. Similar
 // to domain names, they will be addressable after resolution through the
 // provided access point.
 //
 // Access points are usually the addresses of name servers, which fulfill
-// service discovery, load-balancing and other auxillary services. Name servers
+// service discovery, load-balancing and other auxiliary services. Name servers
 // receive periodic heartbeats from affiliate brokers and erase those which
 // failed to maintain alive status.
 //
 // Name servers answer queries of QueryRouteRequest, responding clients with
-// addressable partitions, which they may directly publish messages to or
+// addressable message-queues, which they may directly publish messages to or
 // subscribe messages from.
 //
 // QueryRouteRequest shall include source endpoints, aka, configured
@@ -56,31 +50,22 @@ message QueryRouteRequest {
   Resource topic = 1;
 
   Endpoints endpoints = 2;
-
-  reserved 3 to 64;
 }
 
 message QueryRouteResponse {
-  ResponseCommon common = 1;
+  Status status = 1;
 
-  repeated Partition partitions = 2;
-
-  reserved 3 to 64;
+  repeated MessageQueue message_queues = 2;
 }
 
 message SendMessageRequest {
-  Message message = 1;
-  Partition partition = 2;
-
-  reserved 3 to 64;
+  repeated Message messages = 1;
+  MessageQueue message_queue = 2;
 }
 
 message SendMessageResponse {
-  ResponseCommon common = 1;
-  string message_id = 2;
-  string transaction_id = 3;
-
-  reserved 4 to 64;
+  Status status = 1;
+  repeated SendReceipt receipts = 2;
 }
 
 message QueryAssignmentRequest {
@@ -90,77 +75,41 @@ message QueryAssignmentRequest {
 
   // Service access point
   Endpoints endpoints = 4;
-
-  reserved 5 to 64;
 }
 
 message QueryAssignmentResponse {
-  ResponseCommon common = 1;
+  Status status = 1;
   repeated Assignment assignments = 2;
-
-  reserved 3 to 64;
 }
 
 message ReceiveMessageRequest {
   Resource group = 1;
   string client_id = 2;
-  Partition partition = 3;
+  MessageQueue message_queue = 3;
   FilterExpression filter_expression = 4;
-  ConsumePolicy consume_policy = 5;
-  google.protobuf.Timestamp initialization_timestamp = 6;
-  int32 batch_size = 7;
-  google.protobuf.Duration invisible_duration = 8;
-  google.protobuf.Duration await_time = 9;
-  bool fifo_flag = 10;
-
-  reserved 11 to 64;
+  google.protobuf.Timestamp initialization_timestamp = 5;
+  int32 batch_size = 6;
+  google.protobuf.Duration invisible_duration = 7;
+  google.protobuf.Duration await_duration = 8;
+  bool fifo = 9;
 }
 
 message ReceiveMessageResponse {
-  ResponseCommon common = 1;
+  Status status = 1;
   repeated Message messages = 2;
   google.protobuf.Timestamp delivery_timestamp = 3;
   google.protobuf.Duration invisible_duration = 4;
-
-  reserved 5 to 64;
 }
 
 message AckMessageRequest {
   Resource group = 1;
   Resource topic = 2;
   string client_id = 3;
-  oneof handle {
-    string receipt_handle = 4;
-    int64 offset = 5;
-  }
-  string message_id = 6;
-
-  reserved 7 to 64;
-}
-
-message AckMessageResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message NackMessageRequest {
-  Resource group = 1;
-  Resource topic = 2;
-  string client_id = 3;
   string receipt_handle = 4;
   string message_id = 5;
-  int32 delivery_attempt = 6;
-  int32 max_delivery_attempts = 7;
-
-  reserved 8 to 64;
 }
 
-message NackMessageResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
+message AckMessageResponse { Status status = 1; }
 
 message ForwardMessageToDeadLetterQueueRequest {
   Resource group = 1;
@@ -170,196 +119,130 @@ message ForwardMessageToDeadLetterQueueRequest {
   string message_id = 5;
   int32 delivery_attempt = 6;
   int32 max_delivery_attempts = 7;
-
-  reserved 8 to 64;
 }
 
-message ForwardMessageToDeadLetterQueueResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
 
 message HeartbeatRequest {
   string client_id = 1;
-  oneof client_data {
-    ProducerData producer_data = 2;
-    ConsumerData consumer_data = 3;
-  }
-  bool fifo_flag = 4;
-
-  reserved 5 to 64;
-}
-
-message HeartbeatResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message HealthCheckRequest {
-  Resource group = 1;
-  string client_host = 2;
-
-  reserved 3 to 64;
+  Resource group = 2;
 }
 
-message HealthCheckResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
+message HeartbeatResponse { Status status = 1; }
 
 message EndTransactionRequest {
   Resource group = 1;
   string message_id = 2;
   string transaction_id = 3;
-  enum TransactionResolution {
-    COMMIT = 0;
-    ROLLBACK = 1;
-  }
   TransactionResolution resolution = 4;
-  enum Source {
-    CLIENT = 0;
-    SERVER_CHECK = 1;
-  }
-  Source source = 5;
+  TransactionSource source = 5;
   string trace_context = 6;
-
-  reserved 7 to 64;
 }
 
-message EndTransactionResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
+message EndTransactionResponse { Status status = 1; }
 
 message QueryOffsetRequest {
-  Partition partition = 1;
+  MessageQueue message_queue = 1;
   QueryOffsetPolicy policy = 2;
   google.protobuf.Timestamp time_point = 3;
-
-  reserved 4 to 64;
 }
 
 message QueryOffsetResponse {
-  ResponseCommon common = 1;
+  Status status = 1;
   int64 offset = 2;
-
-  reserved 3 to 64;
 }
 
 message PullMessageRequest {
   Resource group = 1;
-  Partition partition = 2;
+  MessageQueue message_queue = 2;
   int64 offset = 3;
   int32 batch_size = 4;
   google.protobuf.Duration await_time = 5;
   FilterExpression filter_expression = 6;
   string client_id = 7;
-
-  reserved 8 to 64;
 }
 
 message PullMessageResponse {
-  ResponseCommon common = 1;
+  Status status = 1;
   int64 min_offset = 2;
   int64 next_offset = 3;
   int64 max_offset = 4;
   repeated Message messages = 5;
-
-  reserved 6 to 64;
 }
 
-message NoopCommand { reserved 1 to 64; }
-
-message PrintThreadStackTraceCommand {
-  string command_id = 1;
+message PrintThreadStackTraceCommand { int64 command_id = 1; }
 
-  reserved 2 to 64;
-}
-
-message ReportThreadStackTraceRequest {
-  string command_id = 1;
+message ThreadStackTrace {
+  int64 command_id = 1;
   string thread_stack_trace = 2;
-
-  reserved 3 to 64;
 }
 
-message ReportThreadStackTraceResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
-}
-
-message VerifyMessageConsumptionCommand {
-  string command_id = 1;
+message VerifyMessageCommand {
+  int64 command_id = 1;
   Message message = 2;
-
-  reserved 3 to 64;
-}
-
-message ReportMessageConsumptionResultRequest {
-  string command_id = 1;
-  google.rpc.Status status = 2;
-
-  reserved 3 to 64;
 }
 
-message ReportMessageConsumptionResultResponse {
-  ResponseCommon common = 1;
-
-  reserved 2 to 64;
+message VerifyMessageResult {
+  int64 command_id = 1;
+  Status status = 2;
 }
 
 message RecoverOrphanedTransactionCommand {
-  Message orphaned_transactional_message = 1;
-  string transaction_id = 2;
-
-  reserved 3 to 64;
+  int64 command_id = 1;
+  Message orphaned_transactional_message = 2;
+  string transaction_id = 3;
 }
 
-message PollCommandRequest {
+message Settings {
   string client_id = 1;
-  repeated Resource topics = 2;
-  oneof group {
-    Resource producer_group = 3;
-    Resource consumer_group = 4;
-  }
-
-  reserved 5 to 64;
+  string access_point = 2;
+  Publish publish = 3;
+  Subscription subscription = 4;
+  Authentication authentication = 5;
 }
 
-message PollCommandResponse {
-  oneof type {
-    // Default command when no new command need to be delivered.
-    NoopCommand noop_command = 1;
+message TelemetryCommand {
+  oneof command {
+    Settings settings = 1;
+
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
+
     // Request client to print thread stack trace.
-    PrintThreadStackTraceCommand print_thread_stack_trace_command = 2;
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
+
+    ThreadStackTrace thread_stack_trace = 4;
+
     // Request client to verify the consumption of the appointed message.
-    VerifyMessageConsumptionCommand verify_message_consumption_command = 3;
-    // Request client to recover the orphaned transaction message.
-    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 4;
-  }
+    VerifyMessageCommand verify_message_command = 5;
 
-  reserved 5 to 64;
+    VerifyMessageResult verify_message_result = 6;
+  }
 }
 
 message NotifyClientTerminationRequest {
-  oneof group {
-    Resource producer_group = 1;
-    Resource consumer_group = 2;
-  }
-  string client_id = 3;
+  Resource group = 1;
+  string client_id = 2;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+  Resource group = 1;
+  Resource topic = 2;
+
+  // Unique receipt handle to identify message to change
+  string receipt_handle = 3;
 
-  reserved 4 to 64;
+  // New invisible duration
+  google.protobuf.Duration invisible_duration = 4;
 }
 
-message NotifyClientTerminationResponse {
-  ResponseCommon common = 1;
+message ChangeInvisibleDurationResponse {
+  Status status = 1;
 
-  reserved 2 to 64;
+  // Server may generate a new receipt handle for the message.
+  string receipt_handle = 2;
 }
 
 // For all the RPCs in MessagingService, the following error handling policies
@@ -374,10 +257,10 @@ message NotifyClientTerminationResponse {
 // errors raise, return a response with common.status.code == `INTERNAL`.
 service MessagingService {
 
-  // Querys the route entries of the requested topic in the perspective of the
+  // Queries the route entries of the requested topic in the perspective of the
   // given endpoints. On success, servers should return a collection of
-  // addressable partitions. Note servers may return customized route entries
-  // based on endpoints provided.
+  // addressable message-queues. Note servers may return customized route
+  // entries based on endpoints provided.
   //
   // If the requested topic doesn't exist, returns `NOT_FOUND`.
   // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
@@ -393,14 +276,9 @@ service MessagingService {
   // returns `INVALID_ARGUMENT`
   rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
 
-  // Checks the health status of message server, returns `OK` if services are
-  // online and serving. Clients may use this RPC to detect availability of
-  // messaging service, and take isolation actions when necessary.
-  rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {}
-
   // Delivers messages to brokers.
   // Clients may further:
-  // 1. Refine a message destination to topic partition which fulfills parts of
+  // 1. Refine a message destination to message-queues which fulfills parts of
   // FIFO semantic;
   // 2. Flag a message as transactional, which keeps it invisible to consumers
   // until it commits;
@@ -413,16 +291,16 @@ service MessagingService {
   // If the destination topic doesn't exist, returns `NOT_FOUND`.
   rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
 
-  // Querys the assigned partition route info of a topic for current consumer,
-  // the returned assignment result is descided by server-side load balacner.
+  // Queries the assigned route info of a topic for current consumer,
+  // the returned assignment result is decided by server-side load balancer.
   //
   // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
-  // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
   rpc QueryAssignment(QueryAssignmentRequest)
       returns (QueryAssignmentResponse) {}
 
   // Receives messages from the server in batch manner, returns a set of
-  // messages if success. The received messages should be acked or uacked after
+  // messages if success. The received messages should be acked or nacked after
   // processed.
   //
   // If the pending concurrent receive requests exceed the quota of the given
@@ -442,14 +320,6 @@ service MessagingService {
   // `INVALID_ARGUMENT`.
   rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
 
-  // Signals that the message has not been successfully processed. The message
-  // server should resend the message follow the retry policy defined at
-  // server-side.
-  //
-  // If the corresponding topic or consumer group doesn't exist, returns
-  // `NOT_FOUND`.
-  rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
-
   // Forwards one message to dead letter queue if the DeadLetterPolicy is
   // triggered by this message at client-side, return `OK` if success.
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
@@ -458,14 +328,14 @@ service MessagingService {
   // Commits or rollback one transactional message.
   rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
 
-  // Querys the offset of the specific partition, returns the offset with `OK`
-  // if success. The message server should maintain a numerical offset for each
-  // message in a parition.
+  // Queries the offset of the specific message queue, returns the offset with
+  // `OK` if success. The message server should maintain a numerical offset for
+  // each message in a message-queue.
   rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
 
-  // Pulls messages from the specific partition, returns a set of messages with
-  // next pull offset. The pulled messages can't be acked or nacked, while the
-  // client is responsible for manage offesets for consumer, typically update
+  // Pulls messages from the specific message-queue, returns a set of messages
+  // with next pull offset. The pulled messages can't be acked or nacked, while
+  // the client is responsible for manage offsets for consumer, typically 
update
   // consume offset to local memory or a third-party storage service.
   //
   // If the pending concurrent receive requests exceed the quota of the given
@@ -476,35 +346,24 @@ service MessagingService {
   // Please note that client may suffer from false empty responses.
   rpc PullMessage(PullMessageRequest) returns (PullMessageResponse) {}
 
-  // Multiplexing RPC(s) for various polling requests, which issue different
-  // commands to client.
-  //
-  // Sometimes client may need to receive and process the command from server.
-  // To prevent the complexity of streaming RPC(s), a unary RPC using
-  // long-polling is another solution.
+  // Once a client starts, it would immediately establishes bi-lateral stream
+  // RPCs with brokers, reporting its settings as the initiative command.
   //
-  // To mark the request-response of corresponding command, `command_id` in
-  // message is recorded in the subsequent RPC(s). For example, after receiving
-  // command of printing thread stack trace, client would send
-  // `ReportMessageConsumptionResultRequest` to server, which contain both of
-  // the stack trace and `command_id`.
-  //
-  // At same time, `NoopCommand` is delivered from server when no new command 
is
-  // needed, it is essential for client to maintain the ping-pong.
-  //
-  rpc PollCommand(PollCommandRequest) returns (PollCommandResponse) {}
-
-  // After receiving the corresponding polling command, the thread stack trace
-  // is reported to the server.
-  rpc ReportThreadStackTrace(ReportThreadStackTraceRequest)
-      returns (ReportThreadStackTraceResponse) {}
-
-  // After receiving the corresponding polling command, the consumption result
-  // of appointed message is reported to the server.
-  rpc ReportMessageConsumptionResult(ReportMessageConsumptionResultRequest)
-      returns (ReportMessageConsumptionResultResponse) {}
+  // When servers have need of inspecting client status, they would issue
+  // telemetry commands to clients. After executing recieved instructions,
+  // clients shall report command execution results through client-side 
streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
 
   // Notify the server that the client is terminated.
   rpc NotifyClientTermination(NotifyClientTerminationRequest)
       returns (NotifyClientTerminationResponse) {}
+
+  // Once a message is retrieved from consume queue on behalf of the group, it
+  // will be kept invisible to other clients of the same group for a period of
+  // time. The message is supposed to be processed within the invisible
+  // duration. If the client, which is in charge of the invisible message, is
+  // not capable of processing the message timely, it may use
+  // ChangeInvisibleDuration to lengthen invisible duration.
+  rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest)
+      returns (ChangeInvisibleDurationResponse) {}
 }
\ No newline at end of file
diff --git a/proto/google/rpc/error_details.proto 
b/proto/google/rpc/error_details.proto
deleted file mode 100644
index aca01eb..0000000
--- a/proto/google/rpc/error_details.proto
+++ /dev/null
@@ -1,247 +0,0 @@
-// Copyright 2020 Google LLC
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package google.rpc;
-
-import "google/protobuf/duration.proto";
-
-option go_package = 
"google.golang.org/genproto/googleapis/rpc/errdetails;errdetails";
-option java_multiple_files = true;
-option java_outer_classname = "ErrorDetailsProto";
-option java_package = "com.google.rpc";
-option objc_class_prefix = "RPC";
-
-// Describes when the clients can retry a failed request. Clients could ignore
-// the recommendation here or retry when this information is missing from error
-// responses.
-//
-// It's always recommended that clients should use exponential backoff when
-// retrying.
-//
-// Clients should wait until `retry_delay` amount of time has passed since
-// receiving the error response before retrying.  If retrying requests also
-// fail, clients should use an exponential backoff scheme to gradually increase
-// the delay between retries based on `retry_delay`, until either a maximum
-// number of retries have been reached or a maximum retry delay cap has been
-// reached.
-message RetryInfo {
-  // Clients should wait at least this long between retrying the same request.
-  google.protobuf.Duration retry_delay = 1;
-}
-
-// Describes additional debugging info.
-message DebugInfo {
-  // The stack trace entries indicating where the error occurred.
-  repeated string stack_entries = 1;
-
-  // Additional debugging information provided by the server.
-  string detail = 2;
-}
-
-// Describes how a quota check failed.
-//
-// For example if a daily limit was exceeded for the calling project,
-// a service could respond with a QuotaFailure detail containing the project
-// id and the description of the quota limit that was exceeded.  If the
-// calling project hasn't enabled the service in the developer console, then
-// a service could respond with the project id and set `service_disabled`
-// to true.
-//
-// Also see RetryInfo and Help types for other details about handling a
-// quota failure.
-message QuotaFailure {
-  // A message type used to describe a single quota violation.  For example, a
-  // daily quota or a custom quota that was exceeded.
-  message Violation {
-    // The subject on which the quota check failed.
-    // For example, "clientip:<ip address of client>" or "project:<Google
-    // developer project id>".
-    string subject = 1;
-
-    // A description of how the quota check failed. Clients can use this
-    // description to find more about the quota configuration in the service's
-    // public documentation, or find the relevant quota limit to adjust through
-    // developer console.
-    //
-    // For example: "Service disabled" or "Daily Limit for read operations
-    // exceeded".
-    string description = 2;
-  }
-
-  // Describes all quota violations.
-  repeated Violation violations = 1;
-}
-
-// Describes the cause of the error with structured details.
-//
-// Example of an error when contacting the "pubsub.googleapis.com" API when it
-// is not enabled:
-//     { "reason": "API_DISABLED"
-//       "domain": "googleapis.com"
-//       "metadata": {
-//         "resource": "projects/123",
-//         "service": "pubsub.googleapis.com"
-//       }
-//     }
-//
-// This response indicates that the pubsub.googleapis.com API is not enabled.
-//
-// Example of an error that is returned when attempting to create a Spanner
-// instance in a region that is out of stock:
-//     { "reason": "STOCKOUT"
-//       "domain": "spanner.googleapis.com",
-//       "metadata": {
-//         "availableRegions": "us-central1,us-east2"
-//       }
-//     }
-message ErrorInfo {
-  // The reason of the error. This is a constant value that identifies the
-  // proximate cause of the error. Error reasons are unique within a particular
-  // domain of errors. This should be at most 63 characters and match
-  // /[A-Z0-9_]+/.
-  string reason = 1;
-
-  // The logical grouping to which the "reason" belongs. The error domain
-  // is typically the registered service name of the tool or product that
-  // generates the error. Example: "pubsub.googleapis.com". If the error is
-  // generated by some common infrastructure, the error domain must be a
-  // globally unique value that identifies the infrastructure. For Google API
-  // infrastructure, the error domain is "googleapis.com".
-  string domain = 2;
-
-  // Additional structured details about this error.
-  //
-  // Keys should match /[a-zA-Z0-9-_]/ and be limited to 64 characters in
-  // length. When identifying the current value of an exceeded limit, the units
-  // should be contained in the key, not the value.  For example, rather than
-  // {"instanceLimit": "100/request"}, should be returned as,
-  // {"instanceLimitPerRequest": "100"}, if the client exceeds the number of
-  // instances that can be created in a single (batch) request.
-  map<string, string> metadata = 3;
-}
-
-// Describes what preconditions have failed.
-//
-// For example, if an RPC failed because it required the Terms of Service to be
-// acknowledged, it could list the terms of service violation in the
-// PreconditionFailure message.
-message PreconditionFailure {
-  // A message type used to describe a single precondition failure.
-  message Violation {
-    // The type of PreconditionFailure. We recommend using a service-specific
-    // enum type to define the supported precondition violation subjects. For
-    // example, "TOS" for "Terms of Service violation".
-    string type = 1;
-
-    // The subject, relative to the type, that failed.
-    // For example, "google.com/cloud" relative to the "TOS" type would 
indicate
-    // which terms of service is being referenced.
-    string subject = 2;
-
-    // A description of how the precondition failed. Developers can use this
-    // description to understand how to fix the failure.
-    //
-    // For example: "Terms of service not accepted".
-    string description = 3;
-  }
-
-  // Describes all precondition violations.
-  repeated Violation violations = 1;
-}
-
-// Describes violations in a client request. This error type focuses on the
-// syntactic aspects of the request.
-message BadRequest {
-  // A message type used to describe a single bad request field.
-  message FieldViolation {
-    // A path leading to a field in the request body. The value will be a
-    // sequence of dot-separated identifiers that identify a protocol buffer
-    // field. E.g., "field_violations.field" would identify this field.
-    string field = 1;
-
-    // A description of why the request element is bad.
-    string description = 2;
-  }
-
-  // Describes all violations in a client request.
-  repeated FieldViolation field_violations = 1;
-}
-
-// Contains metadata about the request that clients can attach when filing a 
bug
-// or providing other forms of feedback.
-message RequestInfo {
-  // An opaque string that should only be interpreted by the service generating
-  // it. For example, it can be used to identify requests in the service's 
logs.
-  string request_id = 1;
-
-  // Any data that was used to serve this request. For example, an encrypted
-  // stack trace that can be sent back to the service provider for debugging.
-  string serving_data = 2;
-}
-
-// Describes the resource that is being accessed.
-message ResourceInfo {
-  // A name for the type of resource being accessed, e.g. "sql table",
-  // "cloud storage bucket", "file", "Google calendar"; or the type URL
-  // of the resource: e.g. "type.googleapis.com/google.pubsub.v1.Topic".
-  string resource_type = 1;
-
-  // The name of the resource being accessed.  For example, a shared calendar
-  // name: "[email protected]", if the current
-  // error is 
[google.rpc.Code.PERMISSION_DENIED][google.rpc.Code.PERMISSION_DENIED].
-  string resource_name = 2;
-
-  // The owner of the resource (optional).
-  // For example, "user:<owner email>" or "project:<Google developer project
-  // id>".
-  string owner = 3;
-
-  // Describes what error is encountered when accessing this resource.
-  // For example, updating a cloud project may require the `writer` permission
-  // on the developer console project.
-  string description = 4;
-}
-
-// Provides links to documentation or for performing an out of band action.
-//
-// For example, if a quota check failed with an error indicating the calling
-// project hasn't enabled the accessed service, this can contain a URL pointing
-// directly to the right place in the developer console to flip the bit.
-message Help {
-  // Describes a URL link.
-  message Link {
-    // Describes what the link offers.
-    string description = 1;
-
-    // The URL of the link.
-    string url = 2;
-  }
-
-  // URL(s) pointing to additional information on handling the current error.
-  repeated Link links = 1;
-}
-
-// Provides a localized error message that is safe to return to the user
-// which can be attached to an RPC error.
-message LocalizedMessage {
-  // The locale used following the specification defined at
-  // http://www.rfc-editor.org/rfc/bcp/bcp47.txt.
-  // Examples are: "en-US", "fr-CH", "es-MX"
-  string locale = 1;
-
-  // The localized error message in the above locale.
-  string message = 2;
-}
diff --git a/proto/google/rpc/status.proto b/proto/google/rpc/status.proto
deleted file mode 100644
index 3b1f7a9..0000000
--- a/proto/google/rpc/status.proto
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2020 Google LLC
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package google.rpc;
-
-import "google/protobuf/any.proto";
-
-option cc_enable_arenas = true;
-option go_package = "google.golang.org/genproto/googleapis/rpc/status;status";
-option java_multiple_files = true;
-option java_outer_classname = "StatusProto";
-option java_package = "com.google.rpc";
-option objc_class_prefix = "RPC";
-
-// The `Status` type defines a logical error model that is suitable for
-// different programming environments, including REST APIs and RPC APIs. It is
-// used by [gRPC](https://github.com/grpc). Each `Status` message contains
-// three pieces of data: error code, error message, and error details.
-//
-// You can find out more about this error model and how to work with it in the
-// [API Design Guide](https://cloud.google.com/apis/design/errors).
-message Status {
-  // The status code, which should be an enum value of 
[google.rpc.Code][google.rpc.Code].
-  int32 code = 1;
-
-  // A developer-facing error message, which should be in English. Any
-  // user-facing error message should be localized and sent in the
-  // [google.rpc.Status.details][google.rpc.Status.details] field, or 
localized by the client.
-  string message = 2;
-
-  // A list of messages that carry the error details.  There is a common set of
-  // message types for APIs to use.
-  repeated google.protobuf.Any details = 3;
-}
diff --git a/src/bin/server.rs b/src/bin/server.rs
new file mode 100644
index 0000000..d9c1eee
--- /dev/null
+++ b/src/bin/server.rs
@@ -0,0 +1,15 @@
+use rocketmq::pb::messaging_service_server::MessagingServiceServer;
+use rocketmq::service::ServerService;
+use tonic::transport::Server;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let address = "127.0.0.1:5001".parse().unwrap();
+    let service = ServerService::default();
+    println!("Server listens {}", address);
+    Server::builder()
+        .add_service(MessagingServiceServer::new(service))
+        .serve(address)
+        .await?;
+    Ok(())
+}
diff --git a/src/lib.rs b/src/lib.rs
index 346484c..813a8ee 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,26 +1,3 @@
-pub mod apache {
-    pub mod rocketmq {
-        pub mod v1 {
-            tonic::include_proto!("apache.rocketmq.v1");
-        }
-    }
-}
-
-pub mod google {
-    pub mod rpc {
-        tonic::include_proto!("google.rpc");
-    }
-}
-
-pub mod org_apache_rocketmq {
-
-    #[derive(Debug)]
-    struct RpcClient {}
-}
-
-#[cfg(test)]
-pub mod tests {
-
-    #[test]
-    fn it_works() {}
-}
+pub mod pb;
+pub mod rocketmq;
+pub mod service;
diff --git a/src/pb.rs b/src/pb.rs
new file mode 100644
index 0000000..d30fe30
--- /dev/null
+++ b/src/pb.rs
@@ -0,0 +1 @@
+tonic::include_proto!("apache.rocketmq.v2");
diff --git a/src/rocketmq.rs b/src/rocketmq.rs
new file mode 100644
index 0000000..8c4f440
--- /dev/null
+++ b/src/rocketmq.rs
@@ -0,0 +1,90 @@
+use crate::pb::{
+    messaging_service_client::MessagingServiceClient, QueryRouteRequest, 
QueryRouteResponse,
+    Resource,
+};
+use rustls::client::ServerCertVerifier;
+use tonic::{
+    transport::{Channel, ClientTlsConfig},
+    Request, Response,
+};
+
+pub struct RpcClient {
+    stub: MessagingServiceClient<Channel>,
+    remote_address: String,
+}
+
+struct TrustAllCertVerifier;
+
+impl ServerCertVerifier for TrustAllCertVerifier {
+    fn verify_server_cert(
+        &self,
+        end_entity: &rustls::Certificate,
+        intermediates: &[rustls::Certificate],
+        server_name: &rustls::ServerName,
+        scts: &mut dyn Iterator<Item = &[u8]>,
+        ocsp_response: &[u8],
+        now: std::time::SystemTime,
+    ) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
+        Ok(rustls::client::ServerCertVerified::assertion())
+    }
+}
+
+impl RpcClient {
+    pub async fn new(target: &'static str) -> Result<RpcClient, Box<dyn 
std::error::Error>> {
+        let remote_address = String::from(target);
+
+        let mut channel = Channel::from_shared(target)?
+            .tcp_nodelay(true)
+            .connect_timeout(std::time::Duration::from_secs(3));
+        if remote_address.starts_with("https://";) {
+            let verifier = std::sync::Arc::new(TrustAllCertVerifier {});
+            let rustls_config = rustls::client::ClientConfig::builder()
+                .with_safe_defaults()
+                .with_custom_certificate_verifier(verifier)
+                .with_no_client_auth();
+            //TODO: Disable verify server certificate
+            let tls_config = ClientTlsConfig::new();
+            channel = channel.tls_config(tls_config)?;
+        }
+        let channel = channel.connect().await?;
+        let stub = MessagingServiceClient::new(channel);
+        Ok(RpcClient {
+            stub,
+            remote_address,
+        })
+    }
+
+    pub async fn query_route(
+        &mut self,
+        request: QueryRouteRequest,
+    ) -> Result<Response<QueryRouteResponse>, Box<dyn std::error::Error>> {
+        let req = Request::new(request);
+        Ok(self.stub.query_route(req).await?)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_connect() {
+        let target = "http://127.0.0.1:5001";;
+        let mut rpc_client = RpcClient::new(target)
+            .await
+            .expect("Should be able to connect");
+        let topic = Resource {
+            resource_namespace: String::from("arn"),
+            name: String::from("TestTopic"),
+        };
+        let request = QueryRouteRequest {
+            topic: Some(topic),
+            endpoints: None,
+        };
+
+        let reply = rpc_client
+            .query_route(request)
+            .await
+            .expect("Failed to query route");
+    }
+}
diff --git a/src/service.rs b/src/service.rs
new file mode 100644
index 0000000..08c2c8f
--- /dev/null
+++ b/src/service.rs
@@ -0,0 +1,156 @@
+use crate::pb::{
+    messaging_service_server::MessagingService, 
messaging_service_server::MessagingServiceServer,
+    AckMessageRequest, AckMessageResponse, ChangeInvisibleDurationRequest,
+    ChangeInvisibleDurationResponse, EndTransactionRequest, 
EndTransactionResponse,
+    ForwardMessageToDeadLetterQueueRequest, 
ForwardMessageToDeadLetterQueueResponse,
+    HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
+    NotifyClientTerminationResponse, PullMessageRequest, PullMessageResponse,
+    QueryAssignmentRequest, QueryAssignmentResponse, QueryOffsetRequest, 
QueryOffsetResponse,
+    QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest, 
ReceiveMessageResponse,
+    SendMessageRequest, SendMessageResponse, TelemetryCommand,
+};
+use futures::Stream;
+use tonic::{transport::Server, Request, Response, Status, Streaming};
+
+#[derive(Default)]
+pub struct ServerService {}
+
+type ResponseStream =
+    std::pin::Pin<Box<dyn Stream<Item = Result<TelemetryCommand, Status>> + 
Send>>;
+
+#[tonic::async_trait]
+impl MessagingService for ServerService {
+    type TelemetryStream = ResponseStream;
+    async fn query_route(
+        &self,
+        request: Request<QueryRouteRequest>,
+    ) -> Result<Response<QueryRouteResponse>, Status> {
+        println!("{:?}", request);
+        let reply = QueryRouteResponse {
+            status: None,
+            message_queues: vec![],
+        };
+        Ok(Response::new(reply))
+    }
+
+    async fn heartbeat(
+        &self,
+        request: Request<HeartbeatRequest>,
+    ) -> Result<Response<HeartbeatResponse>, Status> {
+        let reply = HeartbeatResponse { status: None };
+        Ok(Response::new(reply))
+    }
+
+    async fn send_message(
+        &self,
+        request: Request<SendMessageRequest>,
+    ) -> Result<Response<SendMessageResponse>, Status> {
+        let reply = SendMessageResponse {
+            status: None,
+            receipts: vec![],
+        };
+        Ok(Response::new(reply))
+    }
+
+    async fn query_assignment(
+        &self,
+        request: Request<QueryAssignmentRequest>,
+    ) -> Result<Response<QueryAssignmentResponse>, Status> {
+        let reply = QueryAssignmentResponse {
+            status: None,
+            assignments: vec![],
+        };
+        Ok(Response::new(reply))
+    }
+
+    async fn receive_message(
+        &self,
+        request: Request<ReceiveMessageRequest>,
+    ) -> Result<Response<ReceiveMessageResponse>, Status> {
+        let reply = ReceiveMessageResponse {
+            status: None,
+            delivery_timestamp: None,
+            invisible_duration: None,
+            messages: vec![],
+        };
+        Ok(Response::new(reply))
+    }
+
+    async fn ack_message(
+        &self,
+        request: Request<AckMessageRequest>,
+    ) -> Result<Response<AckMessageResponse>, Status> {
+        let reply = AckMessageResponse { status: None };
+        Ok(Response::new(reply))
+    }
+
+    async fn forward_message_to_dead_letter_queue(
+        &self,
+        request: Request<ForwardMessageToDeadLetterQueueRequest>,
+    ) -> Result<Response<ForwardMessageToDeadLetterQueueResponse>, Status> {
+        let reply = ForwardMessageToDeadLetterQueueResponse { status: None };
+        Ok(Response::new(reply))
+    }
+
+    /// Commits or rollback one transactional message.
+    async fn end_transaction(
+        &self,
+        request: Request<EndTransactionRequest>,
+    ) -> Result<Response<EndTransactionResponse>, Status> {
+        let reply = EndTransactionResponse { status: None };
+        Ok(Response::new(reply))
+    }
+
+    async fn query_offset(
+        &self,
+        request: Request<QueryOffsetRequest>,
+    ) -> Result<Response<QueryOffsetResponse>, Status> {
+        let reply = QueryOffsetResponse {
+            status: None,
+            offset: 0,
+        };
+        Ok(Response::new(reply))
+    }
+
+    async fn pull_message(
+        &self,
+        request: Request<PullMessageRequest>,
+    ) -> Result<Response<PullMessageResponse>, Status> {
+        let reply = PullMessageResponse {
+            status: None,
+            min_offset: 0,
+            next_offset: 0,
+            max_offset: 10,
+            messages: vec![],
+        };
+        Ok(Response::new(reply))
+    }
+
+    ///Server streaming response type for the Telemetry method.
+    async fn telemetry(
+        &self,
+        request: Request<Streaming<TelemetryCommand>>,
+    ) -> Result<Response<Self::TelemetryStream>, Status> {
+        Err(Status::aborted("NotImplemented"))
+    }
+
+    /// Notify the server that the client is terminated.
+    async fn notify_client_termination(
+        &self,
+        request: Request<NotifyClientTerminationRequest>,
+    ) -> Result<Response<NotifyClientTerminationResponse>, Status> {
+        let reply = NotifyClientTerminationResponse { status: None };
+        Ok(Response::new(reply))
+    }
+
+    async fn change_invisible_duration(
+        &self,
+        request: Request<ChangeInvisibleDurationRequest>,
+    ) -> Result<Response<ChangeInvisibleDurationResponse>, Status> {
+        let reply = ChangeInvisibleDurationResponse {
+            status: None,
+            receipt_handle: String::from("receipt-handle"),
+        };
+        Ok(Response::new(reply))
+    }
+}

Reply via email to