This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 74ff5fa Supports grpc+json and the curl tool (#148)
74ff5fa is described below
commit 74ff5fae8750dbcb83a038ca23cfe69e6f57748c
Author: Urara <[email protected]>
AuthorDate: Sat Aug 19 00:30:14 2023 +0800
Supports grpc+json and the curl tool (#148)
* feat: 支持curl直接访问
将json消息转化为grpc消息,继而交给grpc handle进行处理,并以json格式返回数据
* fix(fix bug):
* refactor: 将生成Codec的逻辑转移到了TripleServer
* Feat: Added support for JSON encoding types(#145)
* fix: Fixed a bug related to compression(#145)
* perf: Improved code reuse-related logic(#145)
Optimized the logic of TripleServer and TripleClient, achieving code
reuse.
Merged encoding and encoding_json
* style: Formatted the code according to the cargo fmt standard(#145)
* perf: Resolved the warnings from cargo check(#145)
* perf: Optimized the code structure and removed redundant code(#145)
* perf: Optimized the configuration format as well as the configuration
loading method.(#145)
* perf: Removed configuration using serialization methods.(#145)
The RPC client now exclusively uses protobuf for serialization.
---------
Co-authored-by: urara <[email protected]>
---
Cargo.toml | 3 +-
dubbo-build/Cargo.toml | 4 +-
dubbo-build/src/client.rs | 18 ----
dubbo-build/src/prost.rs | 2 +
dubbo-build/src/server.rs | 32 +-----
dubbo/Cargo.toml | 2 +-
dubbo/src/codegen.rs | 2 +-
dubbo/src/triple/client/builder.rs | 2 +-
dubbo/src/triple/client/triple.rs | 115 ++++++++++++++------
dubbo/src/triple/decode.rs | 63 ++++++++++-
dubbo/src/triple/encode.rs | 49 +++++----
dubbo/src/triple/server/triple.rs | 126 +++++++++++++++-------
examples/echo/Cargo.toml | 7 +-
examples/echo/src/echo/client.rs | 2 +-
examples/echo/src/generated/grpc.examples.echo.rs | 58 ++++------
examples/greeter/Cargo.toml | 7 +-
examples/greeter/src/greeter/server.rs | 2 +-
17 files changed, 296 insertions(+), 198 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 5270ba4..ad39f08 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -57,6 +57,7 @@ serde_yaml = "0.9.4" # yaml file parser
once_cell = "1.16.0"
itertools = "0.10.1"
bytes = "1.0"
-
+prost-serde = "0.3.0"
+prost-serde-derive = "0.1.2"
diff --git a/dubbo-build/Cargo.toml b/dubbo-build/Cargo.toml
index 3276a68..1ca031d 100644
--- a/dubbo-build/Cargo.toml
+++ b/dubbo-build/Cargo.toml
@@ -10,9 +10,9 @@ repository = "https://github.com/apache/dubbo-rust.git"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-prost = "0.11.0"
+prost = "0.11.9"
prettyplease = {version = "0.1"}
proc-macro2 = "1.0"
quote = "1.0"
syn = "1.0"
-prost-build = "0.11.1"
+prost-build = "0.11.9"
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index bfcfe45..af32b64 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -20,8 +20,6 @@ use crate::{Method, Service};
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
-pub const CODEC_PATH: &str = "dubbo::codegen::ProstCodec";
-
/// Generate service for client.
///
/// This takes some `Service` and will generate a `TokenStream` that contains
@@ -167,7 +165,6 @@ fn generate_unary<T: Method>(
compile_well_known_types: bool,
path: String,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
let method_name = method.identifier();
@@ -177,14 +174,12 @@ fn generate_unary<T: Method>(
&mut self,
request: Request<#request>,
) -> Result<Response<#response>, dubbo::status::Status> {
- let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.unary(
request,
- codec,
path,
invocation,
).await
@@ -199,9 +194,7 @@ fn generate_server_streaming<T: Method>(
compile_well_known_types: bool,
path: String,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
-
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
let method_name = method.identifier();
@@ -210,15 +203,12 @@ fn generate_server_streaming<T: Method>(
&mut self,
request: Request<#request>,
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
-
- let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.server_streaming(
request,
- codec,
path,
invocation,
).await
@@ -233,9 +223,7 @@ fn generate_client_streaming<T: Method>(
compile_well_known_types: bool,
path: String,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
-
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
let method_name = method.identifier();
@@ -244,14 +232,12 @@ fn generate_client_streaming<T: Method>(
&mut self,
request: impl IntoStreamingRequest<Message = #request>
) -> Result<Response<#response>, dubbo::status::Status> {
- let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.client_streaming(
request,
- codec,
path,
invocation,
).await
@@ -266,9 +252,7 @@ fn generate_streaming<T: Method>(
compile_well_known_types: bool,
path: String,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
-
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
let method_name = method.identifier();
@@ -277,14 +261,12 @@ fn generate_streaming<T: Method>(
&mut self,
request: impl IntoStreamingRequest<Message = #request>
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
- let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.bidi_streaming(
request,
- codec,
path,
invocation,
).await
diff --git a/dubbo-build/src/prost.rs b/dubbo-build/src/prost.rs
index 3918fd3..1cd92f6 100644
--- a/dubbo-build/src/prost.rs
+++ b/dubbo-build/src/prost.rs
@@ -93,6 +93,8 @@ impl Builder {
PathBuf::from(std::env::var("OUT_DIR").unwrap())
};
config.out_dir(out_dir);
+ config.type_attribute(".", "#[derive(serde::Serialize,
serde::Deserialize)]");
+ config.type_attribute(".", "#[serde(default)]");
if self.compile_well_known_types {
config.compile_well_known_types();
diff --git a/dubbo-build/src/server.rs b/dubbo-build/src/server.rs
index 4dbbc9d..3ccb52a 100644
--- a/dubbo-build/src/server.rs
+++ b/dubbo-build/src/server.rs
@@ -21,8 +21,6 @@ use proc_macro2::{Span, TokenStream};
use quote::quote;
use syn::{Ident, Lit, LitStr};
-pub const CODEC_PATH: &str = "dubbo::codegen::ProstCodec";
-
/// Generate service for Server.
///
/// This takes some `Service` and will generate a `TokenStream` that contains
@@ -330,8 +328,6 @@ fn generate_unary<T: Method>(
method_ident: Ident,
server_trait: Ident,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
-
let service_ident = quote::format_ident!("{}Server", method.identifier());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
@@ -354,16 +350,11 @@ fn generate_unary<T: Method>(
Box::pin(fut)
}
}
-
let fut = async move {
- let mut server = TripleServer::new(
- #codec_name::<#response, #request>::default()
- );
-
+ let mut server = TripleServer::<#request,#response>::new();
let res = server.unary(#service_ident { inner }, req).await;
Ok(res)
};
-
Box::pin(fut)
}
}
@@ -375,8 +366,6 @@ fn generate_server_streaming<T: Method>(
method_ident: Ident,
server_trait: Ident,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
-
let service_ident = quote::format_ident!("{}Server", method.identifier());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
@@ -402,12 +391,8 @@ fn generate_server_streaming<T: Method>(
Box::pin(fut)
}
}
-
let fut = async move {
- let mut server = TripleServer::new(
- #codec_name::<#response, #request>::default()
- );
-
+ let mut server = TripleServer::<#request,#response>::new();
let res = server.server_streaming(#service_ident { inner },
req).await;
Ok(res)
};
@@ -426,7 +411,6 @@ fn generate_client_streaming<T: Method>(
let service_ident = quote::format_ident!("{}Server", method.identifier());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
quote! {
#[allow(non_camel_case_types)]
@@ -450,10 +434,7 @@ fn generate_client_streaming<T: Method>(
}
let fut = async move {
- let mut server = TripleServer::new(
- #codec_name::<#response, #request>::default()
- );
-
+ let mut server = TripleServer::<#request,#response>::new();
let res = server.client_streaming(#service_ident { inner },
req).await;
Ok(res)
};
@@ -469,8 +450,6 @@ fn generate_streaming<T: Method>(
method_ident: Ident,
server_trait: Ident,
) -> TokenStream {
- let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
-
let service_ident = quote::format_ident!("{}Server", method.identifier());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
@@ -499,10 +478,7 @@ fn generate_streaming<T: Method>(
}
let fut = async move {
- let mut server = TripleServer::new(
- #codec_name::<#response, #request>::default()
- );
-
+ let mut server = TripleServer::<#request,#response>::new();
let res = server.bidi_streaming(#service_ident { inner },
req).await;
Ok(res)
};
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index d0ab2a4..f51523d 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -21,7 +21,7 @@ argh = "0.1"
rustls-pemfile = "1.0.0"
tokio-rustls="0.23.4"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs",
"macros", "net", "signal", "full" ] }
-prost = "0.10.4"
+prost = "0.11.9"
async-trait = "0.1.56"
tower-layer.workspace = true
bytes.workspace = true
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 412feb9..8d95c21 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -34,7 +34,7 @@ pub use super::{
registry::{BoxRegistry, Registry},
triple::{
client::TripleClient,
- codec::{prost::ProstCodec, Codec},
+ codec::{prost::ProstCodec, serde_codec::SerdeCodec, Codec},
decode::Decoding,
server::{
service::{ClientStreamingSvc, ServerStreamingSvc, StreamingSvc,
UnarySvc},
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index 06ecd62..9dae0f9 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -59,7 +59,7 @@ impl ClientBuilder {
connector: "",
directory: Some(Box::new(StaticDirectory::new(&host))),
direct: true,
- host: host.clone().to_string(),
+ host: host.to_string(),
}
}
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index 124cfcf..f8e1175 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -21,14 +21,22 @@ use futures_util::{future, stream, StreamExt, TryStreamExt};
use aws_smithy_http::body::SdkBody;
use http::HeaderValue;
+use prost::Message;
+use serde::{Deserialize, Serialize};
use super::builder::ClientBuilder;
-use crate::codegen::RpcInvocation;
+use crate::codegen::{ProstCodec, RpcInvocation, SerdeCodec};
use crate::{
invocation::{IntoStreamingRequest, Metadata, Request, Response},
protocol::BoxInvoker,
- triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding,
encode::encode},
+ status::Status,
+ triple::{
+ codec::{Codec, Decoder, Encoder},
+ compression::CompressionEncoding,
+ decode::Decoding,
+ encode::encode,
+ },
};
#[derive(Debug, Clone, Default)]
@@ -104,12 +112,11 @@ impl TripleClient {
if let Some(_encoding) = self.send_compression_encoding {
req.headers_mut()
.insert("grpc-encoding",
http::HeaderValue::from_static("gzip"));
+ req.headers_mut().insert(
+ "grpc-accept-encoding",
+ http::HeaderValue::from_static("gzip"),
+ );
}
- req.headers_mut().insert(
- "grpc-accept-encoding",
- http::HeaderValue::from_static("gzip"),
- );
-
// const (
// TripleContentType = "application/grpc+proto"
// TripleUserAgent = "grpc-go/1.35.0-dev"
@@ -125,23 +132,27 @@ impl TripleClient {
req
}
- pub async fn unary<C, M1, M2>(
+ pub async fn unary<M1, M2>(
&mut self,
req: Request<M1>,
- mut codec: C,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
- C: Codec<Encode = M1, Decode = M2>,
- M1: Send + Sync + 'static,
- M2: Send + Sync + 'static,
+ M1: Message + Send + Sync + 'static + Serialize,
+ M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> +
Default,
{
+ let is_json = false;
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+ ) = get_codec(is_json);
let req = req.map(|m| stream::once(future::ready(m)));
let body_stream = encode(
- codec.encoder(),
+ encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
+ is_json,
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
@@ -169,7 +180,7 @@ impl TripleClient {
match response {
Ok(v) => {
let resp = v.map(|body| {
- Decoding::new(body, codec.decoder(),
self.send_compression_encoding)
+ Decoding::new(body, decoder,
self.send_compression_encoding, is_json)
});
let (mut parts, body) = Response::from_http(resp).into_parts();
@@ -194,23 +205,27 @@ impl TripleClient {
}
}
- pub async fn bidi_streaming<C, M1, M2>(
+ pub async fn bidi_streaming<M1, M2>(
&mut self,
req: impl IntoStreamingRequest<Message = M1>,
- mut codec: C,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
- C: Codec<Encode = M1, Decode = M2>,
- M1: Send + Sync + 'static,
- M2: Send + Sync + 'static,
+ M1: Message + Send + Sync + 'static + Serialize,
+ M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> +
Default,
{
+ let is_json = false;
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+ ) = get_codec(is_json);
let req = req.into_streaming_request();
let en = encode(
- codec.encoder(),
+ encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
+ is_json,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
@@ -237,7 +252,7 @@ impl TripleClient {
match response {
Ok(v) => {
let resp = v.map(|body| {
- Decoding::new(body, codec.decoder(),
self.send_compression_encoding)
+ Decoding::new(body, decoder,
self.send_compression_encoding, is_json)
});
Ok(Response::from_http(resp))
@@ -246,23 +261,27 @@ impl TripleClient {
}
}
- pub async fn client_streaming<C, M1, M2>(
+ pub async fn client_streaming<M1, M2>(
&mut self,
req: impl IntoStreamingRequest<Message = M1>,
- mut codec: C,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
- C: Codec<Encode = M1, Decode = M2>,
- M1: Send + Sync + 'static,
- M2: Send + Sync + 'static,
+ M1: Message + Send + Sync + 'static + Serialize,
+ M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> +
Default,
{
+ let is_json = false;
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+ ) = get_codec(is_json);
let req = req.into_streaming_request();
let en = encode(
- codec.encoder(),
+ encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
+ is_json,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
@@ -290,7 +309,7 @@ impl TripleClient {
match response {
Ok(v) => {
let resp = v.map(|body| {
- Decoding::new(body, codec.decoder(),
self.send_compression_encoding)
+ Decoding::new(body, decoder,
self.send_compression_encoding, is_json)
});
let (mut parts, body) = Response::from_http(resp).into_parts();
@@ -315,23 +334,27 @@ impl TripleClient {
}
}
- pub async fn server_streaming<C, M1, M2>(
+ pub async fn server_streaming<M1, M2>(
&mut self,
req: Request<M1>,
- mut codec: C,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
- C: Codec<Encode = M1, Decode = M2>,
- M1: Send + Sync + 'static,
- M2: Send + Sync + 'static,
+ M1: Message + Send + Sync + 'static + Serialize,
+ M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> +
Default,
{
+ let is_json = false;
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+ ) = get_codec(is_json);
let req = req.map(|m| stream::once(future::ready(m)));
let en = encode(
- codec.encoder(),
+ encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
+ is_json,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
@@ -357,7 +380,7 @@ impl TripleClient {
match response {
Ok(v) => {
let resp = v.map(|body| {
- Decoding::new(body, codec.decoder(),
self.send_compression_encoding)
+ Decoding::new(body, decoder,
self.send_compression_encoding, is_json)
});
Ok(Response::from_http(resp))
@@ -366,3 +389,25 @@ impl TripleClient {
}
}
}
+
+pub fn get_codec<M1, M2>(
+ is_json: bool,
+) -> (
+ Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+)
+where
+ M1: Message + Send + Sync + 'static + Serialize,
+ M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
+{
+ match is_json {
+ true => {
+ let mut codec = SerdeCodec::<M1, M2>::default();
+ (Box::new(codec.decoder()), Box::new(codec.encoder()))
+ }
+ false => {
+ let mut codec = ProstCodec::<M1, M2>::default();
+ (Box::new(codec.decoder()), Box::new(codec.encoder()))
+ }
+ }
+}
diff --git a/dubbo/src/triple/decode.rs b/dubbo/src/triple/decode.rs
index 07c1160..875bbea 100644
--- a/dubbo/src/triple/decode.rs
+++ b/dubbo/src/triple/decode.rs
@@ -38,21 +38,27 @@ pub struct Decoding<T> {
trailers: Option<Metadata>,
compress: Option<CompressionEncoding>,
decompress_buf: BytesMut,
+ is_json: bool,
}
#[derive(PartialEq)]
enum State {
ReadHeader,
+ ReadJSON,
ReadBody { len: usize, is_compressed: bool },
Error,
}
impl<T> Decoding<T> {
- pub fn new<B, D>(body: B, decoder: D, compress:
Option<CompressionEncoding>) -> Self
+ pub fn new<B>(
+ body: B,
+ decoder: Box<dyn Decoder<Item = T, Error = crate::status::Status> +
Send + 'static>,
+ compress: Option<CompressionEncoding>,
+ is_json: bool,
+ ) -> Self
where
B: Body + Send + 'static,
B::Error: Into<crate::Error>,
- D: Decoder<Item = T, Error = crate::status::Status> + Send + 'static,
{
Self {
state: State::ReadHeader,
@@ -65,11 +71,12 @@ impl<T> Decoding<T> {
)
})
.boxed_unsync(),
- decoder: Box::new(decoder),
+ decoder,
buf: BytesMut::with_capacity(super::consts::BUFFER_SIZE),
trailers: None,
compress,
decompress_buf: BytesMut::new(),
+ is_json,
}
}
@@ -91,7 +98,47 @@ impl<T> Decoding<T> {
trailer.map(|data| data.map(Metadata::from_headers))
}
- pub fn decode_chunk(&mut self) -> Result<Option<T>, crate::status::Status>
{
+ pub fn decode_json(&mut self) -> Result<Option<T>, crate::status::Status> {
+ if self.state == State::ReadHeader {
+ self.state = State::ReadJSON;
+ return Ok(None);
+ }
+ if let State::ReadJSON = self.state {
+ if self.buf.is_empty() {
+ return Ok(None);
+ }
+ match self.compress {
+ None => self.decompress_buf = self.buf.clone(),
+ Some(compress) => {
+ let len = self.buf.len();
+ if let Err(err) =
+ decompress(compress, &mut self.buf, &mut
self.decompress_buf, len)
+ {
+ return Err(crate::status::Status::new(
+ crate::status::Code::Internal,
+ err.to_string(),
+ ));
+ }
+ }
+ }
+ let len = self.decompress_buf.len();
+ let decoding_result = self
+ .decoder
+ .decode(&mut DecodeBuf::new(&mut self.decompress_buf, len));
+
+ return match decoding_result {
+ Ok(Some(r)) => {
+ self.state = State::ReadHeader;
+ Ok(Some(r))
+ }
+ Ok(None) => Ok(None),
+ Err(err) => Err(err),
+ };
+ }
+ Ok(None)
+ }
+
+ pub fn decode_proto(&mut self) -> Result<Option<T>, crate::status::Status>
{
if self.state == State::ReadHeader {
// buffer is full
if self.buf.remaining() < super::consts::HEADER_SIZE {
@@ -166,6 +213,14 @@ impl<T> Decoding<T> {
Ok(None)
}
+
+ pub fn decode_chunk(&mut self) -> Result<Option<T>, crate::status::Status>
{
+ if self.is_json {
+ self.decode_json()
+ } else {
+ self.decode_proto()
+ }
+ }
}
impl<T> Stream for Decoding<T> {
diff --git a/dubbo/src/triple/encode.rs b/dubbo/src/triple/encode.rs
index b837463..1c971d6 100644
--- a/dubbo/src/triple/encode.rs
+++ b/dubbo/src/triple/encode.rs
@@ -28,13 +28,13 @@ use crate::triple::codec::{EncodeBuf, Encoder};
#[allow(unused_must_use)]
pub fn encode<E, B>(
- mut encoder: E,
+ mut encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
resp_body: B,
compression_encoding: Option<CompressionEncoding>,
+ is_json: bool,
) -> impl TryStream<Ok = Bytes, Error = Status>
where
- E: Encoder<Error = Status>,
- B: Stream<Item = Result<E::Item, Status>>,
+ B: Stream<Item = Result<E, Status>>,
{
async_stream::stream! {
let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
@@ -48,12 +48,13 @@ where
loop {
match resp_body.next().await {
Some(Ok(item)) => {
- // 编码数据到缓冲中
- buf.reserve(super::consts::HEADER_SIZE);
+ if !is_json {
+ buf.reserve(super::consts::HEADER_SIZE);
unsafe {
buf.advance_mut(super::consts::HEADER_SIZE);
+ }
}
-
+ // 编码数据到缓冲中
if enable_compress {
uncompression_buf.clear();
@@ -66,16 +67,21 @@ where
} else {
encoder.encode(item, &mut EncodeBuf::new(&mut
buf)).map_err(|_e| crate::status::Status::new(crate::status::Code::Internal,
"encode error".to_string()));
}
-
-
- let len = buf.len() - super::consts::HEADER_SIZE;
- {
+ let result=match is_json{
+ true=>{
+ buf.clone()
+ }
+ false=>{
+ let len = buf.len() - super::consts::HEADER_SIZE;
+ {
let mut buf = &mut buf[..super::consts::HEADER_SIZE];
buf.put_u8(enable_compress as u8);
buf.put_u32(len as u32);
- }
-
- yield Ok(buf.split_to(len +
super::consts::HEADER_SIZE).freeze());
+ }
+ buf.split_to(len + super::consts::HEADER_SIZE)
+ }
+ };
+ yield Ok(result.freeze());
},
Some(Err(err)) => yield Err(err.into()),
None => break,
@@ -85,28 +91,28 @@ where
}
pub fn encode_server<E, B>(
- encoder: E,
+ encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
body: B,
compression_encoding: Option<CompressionEncoding>,
+ is_json: bool,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
- E: Encoder<Error = Status>,
- B: Stream<Item = Result<E::Item, Status>>,
+ B: Stream<Item = Result<E, Status>>,
{
- let s = encode(encoder, body, compression_encoding).into_stream();
+ let s = encode(encoder, body, compression_encoding, is_json).into_stream();
EncodeBody::new_server(s)
}
pub fn encode_client<E, B>(
- encoder: E,
+ encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
body: B,
compression_encoding: Option<CompressionEncoding>,
+ is_json: bool,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
- E: Encoder<Error = Status>,
- B: Stream<Item = E::Item>,
+ B: Stream<Item = E>,
{
- let s = encode(encoder, body.map(Ok), compression_encoding).into_stream();
+ let s = encode(encoder, body.map(Ok), compression_encoding,
is_json).into_stream();
EncodeBody::new_client(s)
}
@@ -115,6 +121,7 @@ enum Role {
Server,
Client,
}
+
#[pin_project]
pub struct EncodeBody<S> {
#[pin]
diff --git a/dubbo/src/triple/server/triple.rs
b/dubbo/src/triple/server/triple.rs
index 2c0626c..2586e64 100644
--- a/dubbo/src/triple/server/triple.rs
+++ b/dubbo/src/triple/server/triple.rs
@@ -16,12 +16,18 @@
*/
use futures_util::{future, stream, StreamExt, TryStreamExt};
+use http::HeaderValue;
use http_body::Body;
+use prost::Message;
+use serde::{Deserialize, Serialize};
+use std::marker::PhantomData;
use crate::{
invocation::Request,
+ status::Status,
triple::{
- codec::Codec,
+ client::triple::get_codec,
+ codec::{Decoder, Encoder},
compression::{CompressionEncoding, COMPRESSIONS},
decode::Decoding,
encode::encode_server,
@@ -34,23 +40,24 @@ use dubbo_config::BusinessConfig;
pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
pub const GRPC_ENCODING: &str = "grpc-encoding";
-pub struct TripleServer<T> {
- codec: T,
+pub struct TripleServer<M1, M2> {
+ _pd: PhantomData<(M1, M2)>,
compression: Option<CompressionEncoding>,
}
-impl<T> TripleServer<T> {
- pub fn new(codec: T) -> Self {
+impl<M1, M2> TripleServer<M1, M2> {
+ pub fn new() -> Self {
Self {
- codec,
- compression: None,
+ _pd: PhantomData,
+ compression: Some(CompressionEncoding::Gzip),
}
}
}
-impl<T> TripleServer<T>
+impl<M1, M2> TripleServer<M1, M2>
where
- T: Codec,
+ M1: Message + for<'a> Deserialize<'a> + Default + 'static,
+ M2: Message + Serialize + Default + 'static,
{
pub async fn client_streaming<S, B>(
&mut self,
@@ -58,10 +65,20 @@ where
req: http::Request<B>,
) -> http::Response<BoxBody>
where
- S: ClientStreamingSvc<T::Decode, Response = T::Encode>,
+ S: ClientStreamingSvc<M1, Response = M2>,
B: Body + Send + 'static,
B::Error: Into<crate::Error> + Send,
{
+ let content_type = req
+ .headers()
+ .get("content-type")
+ .cloned()
+ .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+ let is_json = content_type == "application/json" || content_type ==
"application/grpc+json";
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+ ) = get_codec(is_json);
let mut accept_encoding =
CompressionEncoding::from_accept_encoding(req.headers());
if self.compression.is_none() || accept_encoding.is_none() {
accept_encoding = None;
@@ -73,7 +90,7 @@ where
Err(status) => return status.to_http(),
};
- let req_stream = req.map(|body| Decoding::new(body,
self.codec.decoder(), compression));
+ let req_stream = req.map(|body| Decoding::new(body, decoder,
compression, is_json));
let resp = service.call(Request::from_http(req_stream)).await;
@@ -83,15 +100,15 @@ where
};
let resp_body = encode_server(
- self.codec.encoder(),
+ encoder,
stream::once(future::ready(resp_body)).map(Ok).into_stream(),
accept_encoding,
+ is_json,
);
- parts.headers.insert(
- http::header::CONTENT_TYPE,
- http::HeaderValue::from_static("application/grpc"),
- );
+ parts
+ .headers
+ .insert(http::header::CONTENT_TYPE, content_type);
if let Some(encoding) = accept_encoding {
parts
.headers
@@ -107,11 +124,22 @@ where
req: http::Request<B>,
) -> http::Response<BoxBody>
where
- S: StreamingSvc<T::Decode, Response = T::Encode>,
+ S: StreamingSvc<M1, Response = M2>,
S::ResponseStream: Send + 'static,
B: Body + Send + 'static,
B::Error: Into<crate::Error> + Send,
{
+ let content_type = req
+ .headers()
+ .get("content-type")
+ .cloned()
+ .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+ let is_json = content_type == "application/json" || content_type ==
"application/grpc+json";
+
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+ ) = get_codec(is_json);
// Firstly, get grpc_accept_encoding from http_header, get compression
// Secondly, if server enable compression and compression is valid,
this method should compress response
let mut accept_encoding =
CompressionEncoding::from_accept_encoding(req.headers());
@@ -125,7 +153,7 @@ where
Err(status) => return status.to_http(),
};
- let req_stream = req.map(|body| Decoding::new(body,
self.codec.decoder(), compression));
+ let req_stream = req.map(|body| Decoding::new(body, decoder,
compression, is_json));
let resp = service.call(Request::from_http(req_stream)).await;
@@ -133,12 +161,11 @@ where
Ok(v) => v.into_http().into_parts(),
Err(err) => return err.to_http(),
};
- let resp_body = encode_server(self.codec.encoder(), resp_body,
compression);
+ let resp_body = encode_server(encoder, resp_body, compression,
is_json);
- parts.headers.insert(
- http::header::CONTENT_TYPE,
- http::HeaderValue::from_static("application/grpc"),
- );
+ parts
+ .headers
+ .insert(http::header::CONTENT_TYPE, content_type);
if let Some(encoding) = accept_encoding {
parts
.headers
@@ -154,11 +181,22 @@ where
req: http::Request<B>,
) -> http::Response<BoxBody>
where
- S: ServerStreamingSvc<T::Decode, Response = T::Encode>,
+ S: ServerStreamingSvc<M1, Response = M2>,
S::ResponseStream: Send + 'static,
B: Body + Send + 'static,
B::Error: Into<crate::Error> + Send,
{
+ let content_type = req
+ .headers()
+ .get("content-type")
+ .cloned()
+ .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+ let is_json = content_type == "application/json" || content_type ==
"application/grpc+json";
+
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+ ) = get_codec(is_json);
// Firstly, get grpc_accept_encoding from http_header, get compression
// Secondly, if server enable compression and compression is valid,
this method should compress response
let mut accept_encoding =
CompressionEncoding::from_accept_encoding(req.headers());
@@ -171,8 +209,7 @@ where
Ok(val) => val,
Err(status) => return status.to_http(),
};
-
- let req_stream = req.map(|body| Decoding::new(body,
self.codec.decoder(), compression));
+ let req_stream = req.map(|body| Decoding::new(body, decoder,
compression, is_json));
let (parts, mut body) = Request::from_http(req_stream).into_parts();
let msg = body.try_next().await.unwrap().ok_or_else(|| {
crate::status::Status::new(crate::status::Code::Unknown, "request
wrong".to_string())
@@ -188,12 +225,11 @@ where
Ok(v) => v.into_http().into_parts(),
Err(err) => return err.to_http(),
};
- let resp_body = encode_server(self.codec.encoder(), resp_body,
compression);
+ let resp_body = encode_server(encoder, resp_body, compression,
is_json);
- parts.headers.insert(
- http::header::CONTENT_TYPE,
- http::HeaderValue::from_static("application/grpc"),
- );
+ parts
+ .headers
+ .insert(http::header::CONTENT_TYPE, content_type);
if let Some(encoding) = accept_encoding {
parts
.headers
@@ -209,7 +245,7 @@ where
req: http::Request<B>,
) -> http::Response<BoxBody>
where
- S: UnarySvc<T::Decode, Response = T::Encode>,
+ S: UnarySvc<M1, Response = M2>,
B: Body + Send + 'static,
B::Error: Into<crate::Error> + Send,
{
@@ -222,8 +258,18 @@ where
Ok(val) => val,
Err(status) => return status.to_http(),
};
-
- let req_stream = req.map(|body| Decoding::new(body,
self.codec.decoder(), compression));
+ let content_type = req
+ .headers()
+ .get("content-type")
+ .cloned()
+ .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+ let is_json = content_type == "application/json" || content_type ==
"application/grpc+json";
+
+ let (decoder, encoder): (
+ Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+ Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+ ) = get_codec(is_json);
+ let req_stream = req.map(|body| Decoding::new(body, decoder,
compression, is_json));
let (parts, mut body) = Request::from_http(req_stream).into_parts();
let msg = body.try_next().await.unwrap().ok_or_else(|| {
crate::status::Status::new(crate::status::Code::Unknown, "request
wrong".to_string())
@@ -240,15 +286,15 @@ where
Err(err) => return err.to_http(),
};
let resp_body = encode_server(
- self.codec.encoder(),
+ encoder,
stream::once(future::ready(resp_body)).map(Ok).into_stream(),
accept_encoding,
+ is_json,
);
- parts.headers.insert(
- http::header::CONTENT_TYPE,
- http::HeaderValue::from_static("application/grpc"),
- );
+ parts
+ .headers
+ .insert(http::header::CONTENT_TYPE, content_type);
if let Some(encoding) = accept_encoding {
parts
.headers
@@ -282,7 +328,7 @@ where
}
}
-impl<T> BusinessConfig for TripleServer<T> {
+impl<M1, M2> BusinessConfig for TripleServer<M1, M2> {
fn init() -> Self {
todo!()
}
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index bc6638b..81df8ed 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -24,8 +24,11 @@ http = "0.2"
http-body = "0.4.4"
futures-util = {version = "0.3", default-features = false}
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs",
"macros", "net", "signal"] }
-prost-derive = {version = "0.10", optional = true}
-prost = "0.10.4"
+prost-derive = {version = "0.11.9", optional = true}
+prost = "0.11.9"
+prost-serde = "0.3.0"
+prost-serde-derive = "0.1.2"
+serde = { version = "1.0.171",features = ["derive"] }
async-trait = "0.1.56"
tokio-stream = "0.1"
dubbo-logger.workspace=true
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 0a2f150..1a21000 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -71,7 +71,7 @@ async fn main() {
};
let (_parts, resp_body) = client_streaming_resp.into_parts();
println!("client streaming, Response: {:?}", resp_body);
-
+ //
let data = vec![
EchoRequest {
message: "msg1 from client".to_string(),
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs
b/examples/echo/src/generated/grpc.examples.echo.rs
index 16fb163..0a74655 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -1,12 +1,16 @@
-// @generated by apache/dubbo-rust.
-
/// EchoRequest is the request for echo.
+#[derive(serde::Serialize, serde::Deserialize)]
+#[serde(default)]
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoRequest {
#[prost(string, tag = "1")]
pub message: ::prost::alloc::string::String,
}
/// EchoResponse is the response for echo.
+#[derive(serde::Serialize, serde::Deserialize)]
+#[serde(default)]
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoResponse {
#[prost(string, tag = "1")]
@@ -31,73 +35,55 @@ pub mod echo_client {
inner: TripleClient::new(builder),
}
}
- pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
- self.inner = self.inner.with_cluster(invoker);
- self
- }
/// UnaryEcho is unary echo.
pub async fn unary_echo(
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
let path =
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
- self.inner.unary(request, codec, path, invocation).await
+ self.inner.unary(request, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
pub async fn server_streaming_echo(
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
- self.inner
- .server_streaming(request, codec, path, invocation)
- .await
+ self.inner.server_streaming(request, path, invocation).await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
- self.inner
- .client_streaming(request, codec, path, invocation)
- .await
+ self.inner.client_streaming(request, path, invocation).await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
- self.inner
- .bidi_streaming(request, codec, path, invocation)
- .await
+ self.inner.bidi_streaming(request, path, invocation).await
}
}
}
@@ -186,10 +172,8 @@ pub mod echo_server {
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server =
+ TripleServer::<super::EchoRequest,
super::EchoResponse>::new();
let res = server.unary(UnaryEchoServer { inner },
req).await;
Ok(res)
};
@@ -212,10 +196,8 @@ pub mod echo_server {
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server =
+ TripleServer::<super::EchoRequest,
super::EchoResponse>::new();
let res = server
.server_streaming(ServerStreamingEchoServer {
inner }, req)
.await;
@@ -241,10 +223,8 @@ pub mod echo_server {
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server =
+ TripleServer::<super::EchoRequest,
super::EchoResponse>::new();
let res = server
.client_streaming(ClientStreamingEchoServer {
inner }, req)
.await;
@@ -273,10 +253,8 @@ pub mod echo_server {
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server =
+ TripleServer::<super::EchoRequest,
super::EchoResponse>::new();
let res = server
.bidi_streaming(BidirectionalStreamingEchoServer {
inner }, req)
.await;
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index d68cab7..37faf91 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -24,8 +24,11 @@ http = "0.2"
http-body = "0.4.4"
futures-util = { version = "0.3", default-features = false }
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs",
"macros", "net", "signal"] }
-prost-derive = { version = "0.10", optional = true }
-prost = "0.10.4"
+prost-derive = { version = "0.11.9", optional = true }
+prost = "0.11.9"
+serde = { version = "1.0.171",features = ["derive"] }
+prost-serde = "0.3.0"
+prost-serde-derive = "0.1.2"
async-trait = "0.1.56"
tokio-stream = "0.1"
dubbo-logger = { path = "../../common/logger" }
diff --git a/examples/greeter/src/greeter/server.rs
b/examples/greeter/src/greeter/server.rs
index 94e4e53..f143e25 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -76,7 +76,7 @@ impl Greeter for GreeterServerImpl {
request: Request<GreeterRequest>,
) -> Result<Response<GreeterReply>, dubbo::status::Status> {
info!("GreeterServer::greet {:?}", request.metadata);
-
+ println!("{:?}", request.into_inner());
Ok(Response::new(GreeterReply {
message: "hello, dubbo-rust".to_string(),
}))