This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 74ff5fa  Supports grpc+json and the curl tool (#148)
74ff5fa is described below

commit 74ff5fae8750dbcb83a038ca23cfe69e6f57748c
Author: Urara <[email protected]>
AuthorDate: Sat Aug 19 00:30:14 2023 +0800

    Supports grpc+json and the curl tool (#148)
    
    * feat: 支持curl直接访问
    
    将json消息转化为grpc消息,继而交给grpc handle进行处理,并以json格式返回数据
    
    * fix(fix bug):
    
    * refactor: 将生成Codec的逻辑转移到了TripleServer
    
    * Feat: Added support for JSON encoding types(#145)
    
    * fix: Fixed a bug related to compression(#145)
    
    * perf: Improved code reuse-related logic(#145)
    
    Optimized the logic of TripleServer and TripleClient, achieving code
    reuse.
    Merged encoding and encoding_json
    
    * style: Formatted the code according to the cargo fmt standard(#145)
    
    * perf: Resolved the warnings from cargo check(#145)
    
    * perf: Optimized the code structure and removed redundant code(#145)
    
    * perf: Optimized the configuration format as well as the configuration 
loading method.(#145)
    
    * perf: Removed configuration using serialization methods.(#145)
    
    The RPC client now exclusively uses protobuf for serialization.
    
    ---------
    
    Co-authored-by: urara <[email protected]>
---
 Cargo.toml                                        |   3 +-
 dubbo-build/Cargo.toml                            |   4 +-
 dubbo-build/src/client.rs                         |  18 ----
 dubbo-build/src/prost.rs                          |   2 +
 dubbo-build/src/server.rs                         |  32 +-----
 dubbo/Cargo.toml                                  |   2 +-
 dubbo/src/codegen.rs                              |   2 +-
 dubbo/src/triple/client/builder.rs                |   2 +-
 dubbo/src/triple/client/triple.rs                 | 115 ++++++++++++++------
 dubbo/src/triple/decode.rs                        |  63 ++++++++++-
 dubbo/src/triple/encode.rs                        |  49 +++++----
 dubbo/src/triple/server/triple.rs                 | 126 +++++++++++++++-------
 examples/echo/Cargo.toml                          |   7 +-
 examples/echo/src/echo/client.rs                  |   2 +-
 examples/echo/src/generated/grpc.examples.echo.rs |  58 ++++------
 examples/greeter/Cargo.toml                       |   7 +-
 examples/greeter/src/greeter/server.rs            |   2 +-
 17 files changed, 296 insertions(+), 198 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5270ba4..ad39f08 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -57,6 +57,7 @@ serde_yaml = "0.9.4" # yaml file parser
 once_cell = "1.16.0"
 itertools = "0.10.1"
 bytes = "1.0"
-
+prost-serde = "0.3.0"
+prost-serde-derive = "0.1.2"
 
 
diff --git a/dubbo-build/Cargo.toml b/dubbo-build/Cargo.toml
index 3276a68..1ca031d 100644
--- a/dubbo-build/Cargo.toml
+++ b/dubbo-build/Cargo.toml
@@ -10,9 +10,9 @@ repository = "https://github.com/apache/dubbo-rust.git";
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-prost = "0.11.0"
+prost = "0.11.9"
 prettyplease = {version = "0.1"}
 proc-macro2 = "1.0"
 quote = "1.0"
 syn = "1.0"
-prost-build = "0.11.1"
+prost-build = "0.11.9"
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index bfcfe45..af32b64 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -20,8 +20,6 @@ use crate::{Method, Service};
 use proc_macro2::TokenStream;
 use quote::{format_ident, quote};
 
-pub const CODEC_PATH: &str = "dubbo::codegen::ProstCodec";
-
 /// Generate service for client.
 ///
 /// This takes some `Service` and will generate a `TokenStream` that contains
@@ -167,7 +165,6 @@ fn generate_unary<T: Method>(
     compile_well_known_types: bool,
     path: String,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
     let ident = format_ident!("{}", method.name());
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
     let method_name = method.identifier();
@@ -177,14 +174,12 @@ fn generate_unary<T: Method>(
             &mut self,
             request: Request<#request>,
         ) -> Result<Response<#response>, dubbo::status::Status> {
-           let codec = #codec_name::<#request, #response>::default();
            let invocation = RpcInvocation::default()
             .with_service_unique_name(String::from(#service_unique_name))
             .with_method_name(String::from(#method_name));
            let path = http::uri::PathAndQuery::from_static(#path);
            self.inner.unary(
                 request,
-                codec,
                 path,
                 invocation,
             ).await
@@ -199,9 +194,7 @@ fn generate_server_streaming<T: Method>(
     compile_well_known_types: bool,
     path: String,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
     let ident = format_ident!("{}", method.name());
-
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
     let method_name = method.identifier();
 
@@ -210,15 +203,12 @@ fn generate_server_streaming<T: Method>(
             &mut self,
             request: Request<#request>,
         ) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
-
-            let codec = #codec_name::<#request, #response>::default();
             let invocation = RpcInvocation::default()
              .with_service_unique_name(String::from(#service_unique_name))
              .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
             self.inner.server_streaming(
                 request,
-                codec,
                 path,
                 invocation,
             ).await
@@ -233,9 +223,7 @@ fn generate_client_streaming<T: Method>(
     compile_well_known_types: bool,
     path: String,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
     let ident = format_ident!("{}", method.name());
-
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
     let method_name = method.identifier();
 
@@ -244,14 +232,12 @@ fn generate_client_streaming<T: Method>(
             &mut self,
             request: impl IntoStreamingRequest<Message = #request>
         ) -> Result<Response<#response>, dubbo::status::Status> {
-            let codec = #codec_name::<#request, #response>::default();
             let invocation = RpcInvocation::default()
              .with_service_unique_name(String::from(#service_unique_name))
              .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
             self.inner.client_streaming(
                 request,
-                codec,
                 path,
                 invocation,
             ).await
@@ -266,9 +252,7 @@ fn generate_streaming<T: Method>(
     compile_well_known_types: bool,
     path: String,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
     let ident = format_ident!("{}", method.name());
-
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
     let method_name = method.identifier();
 
@@ -277,14 +261,12 @@ fn generate_streaming<T: Method>(
             &mut self,
             request: impl IntoStreamingRequest<Message = #request>
         ) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
-            let codec = #codec_name::<#request, #response>::default();
             let invocation = RpcInvocation::default()
              .with_service_unique_name(String::from(#service_unique_name))
              .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
             self.inner.bidi_streaming(
                 request,
-                codec,
                 path,
                 invocation,
             ).await
diff --git a/dubbo-build/src/prost.rs b/dubbo-build/src/prost.rs
index 3918fd3..1cd92f6 100644
--- a/dubbo-build/src/prost.rs
+++ b/dubbo-build/src/prost.rs
@@ -93,6 +93,8 @@ impl Builder {
             PathBuf::from(std::env::var("OUT_DIR").unwrap())
         };
         config.out_dir(out_dir);
+        config.type_attribute(".", "#[derive(serde::Serialize, 
serde::Deserialize)]");
+        config.type_attribute(".", "#[serde(default)]");
 
         if self.compile_well_known_types {
             config.compile_well_known_types();
diff --git a/dubbo-build/src/server.rs b/dubbo-build/src/server.rs
index 4dbbc9d..3ccb52a 100644
--- a/dubbo-build/src/server.rs
+++ b/dubbo-build/src/server.rs
@@ -21,8 +21,6 @@ use proc_macro2::{Span, TokenStream};
 use quote::quote;
 use syn::{Ident, Lit, LitStr};
 
-pub const CODEC_PATH: &str = "dubbo::codegen::ProstCodec";
-
 /// Generate service for Server.
 ///
 /// This takes some `Service` and will generate a `TokenStream` that contains
@@ -330,8 +328,6 @@ fn generate_unary<T: Method>(
     method_ident: Ident,
     server_trait: Ident,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
-
     let service_ident = quote::format_ident!("{}Server", method.identifier());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
@@ -354,16 +350,11 @@ fn generate_unary<T: Method>(
                 Box::pin(fut)
             }
         }
-
         let fut = async move {
-            let mut server = TripleServer::new(
-                #codec_name::<#response, #request>::default()
-            );
-
+            let mut server = TripleServer::<#request,#response>::new();
             let res = server.unary(#service_ident { inner }, req).await;
             Ok(res)
         };
-
         Box::pin(fut)
     }
 }
@@ -375,8 +366,6 @@ fn generate_server_streaming<T: Method>(
     method_ident: Ident,
     server_trait: Ident,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
-
     let service_ident = quote::format_ident!("{}Server", method.identifier());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
@@ -402,12 +391,8 @@ fn generate_server_streaming<T: Method>(
                 Box::pin(fut)
             }
         }
-
         let fut = async move {
-            let mut server = TripleServer::new(
-                #codec_name::<#response, #request>::default()
-            );
-
+            let mut server = TripleServer::<#request,#response>::new();
             let res = server.server_streaming(#service_ident { inner }, 
req).await;
             Ok(res)
         };
@@ -426,7 +411,6 @@ fn generate_client_streaming<T: Method>(
     let service_ident = quote::format_ident!("{}Server", method.identifier());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
 
     quote! {
         #[allow(non_camel_case_types)]
@@ -450,10 +434,7 @@ fn generate_client_streaming<T: Method>(
         }
 
         let fut = async move {
-            let mut server = TripleServer::new(
-                #codec_name::<#response, #request>::default()
-            );
-
+            let mut server = TripleServer::<#request,#response>::new();
             let res = server.client_streaming(#service_ident { inner }, 
req).await;
             Ok(res)
         };
@@ -469,8 +450,6 @@ fn generate_streaming<T: Method>(
     method_ident: Ident,
     server_trait: Ident,
 ) -> TokenStream {
-    let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
-
     let service_ident = quote::format_ident!("{}Server", method.identifier());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
@@ -499,10 +478,7 @@ fn generate_streaming<T: Method>(
         }
 
         let fut = async move {
-            let mut server = TripleServer::new(
-                #codec_name::<#response, #request>::default()
-            );
-
+            let mut server = TripleServer::<#request,#response>::new();
             let res = server.bidi_streaming(#service_ident { inner }, 
req).await;
             Ok(res)
         };
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index d0ab2a4..f51523d 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -21,7 +21,7 @@ argh = "0.1"
 rustls-pemfile = "1.0.0"
 tokio-rustls="0.23.4"
 tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", 
"macros", "net", "signal",  "full" ] }
-prost = "0.10.4"
+prost = "0.11.9"
 async-trait = "0.1.56"
 tower-layer.workspace = true
 bytes.workspace = true
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 412feb9..8d95c21 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -34,7 +34,7 @@ pub use super::{
     registry::{BoxRegistry, Registry},
     triple::{
         client::TripleClient,
-        codec::{prost::ProstCodec, Codec},
+        codec::{prost::ProstCodec, serde_codec::SerdeCodec, Codec},
         decode::Decoding,
         server::{
             service::{ClientStreamingSvc, ServerStreamingSvc, StreamingSvc, 
UnarySvc},
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index 06ecd62..9dae0f9 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -59,7 +59,7 @@ impl ClientBuilder {
             connector: "",
             directory: Some(Box::new(StaticDirectory::new(&host))),
             direct: true,
-            host: host.clone().to_string(),
+            host: host.to_string(),
         }
     }
 
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 124cfcf..f8e1175 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -21,14 +21,22 @@ use futures_util::{future, stream, StreamExt, TryStreamExt};
 
 use aws_smithy_http::body::SdkBody;
 use http::HeaderValue;
+use prost::Message;
+use serde::{Deserialize, Serialize};
 
 use super::builder::ClientBuilder;
-use crate::codegen::RpcInvocation;
+use crate::codegen::{ProstCodec, RpcInvocation, SerdeCodec};
 
 use crate::{
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
     protocol::BoxInvoker,
-    triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, 
encode::encode},
+    status::Status,
+    triple::{
+        codec::{Codec, Decoder, Encoder},
+        compression::CompressionEncoding,
+        decode::Decoding,
+        encode::encode,
+    },
 };
 
 #[derive(Debug, Clone, Default)]
@@ -104,12 +112,11 @@ impl TripleClient {
         if let Some(_encoding) = self.send_compression_encoding {
             req.headers_mut()
                 .insert("grpc-encoding", 
http::HeaderValue::from_static("gzip"));
+            req.headers_mut().insert(
+                "grpc-accept-encoding",
+                http::HeaderValue::from_static("gzip"),
+            );
         }
-        req.headers_mut().insert(
-            "grpc-accept-encoding",
-            http::HeaderValue::from_static("gzip"),
-        );
-
         // const (
         //     TripleContentType    = "application/grpc+proto"
         //     TripleUserAgent      = "grpc-go/1.35.0-dev"
@@ -125,23 +132,27 @@ impl TripleClient {
         req
     }
 
-    pub async fn unary<C, M1, M2>(
+    pub async fn unary<M1, M2>(
         &mut self,
         req: Request<M1>,
-        mut codec: C,
         path: http::uri::PathAndQuery,
         invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
-        C: Codec<Encode = M1, Decode = M2>,
-        M1: Send + Sync + 'static,
-        M2: Send + Sync + 'static,
+        M1: Message + Send + Sync + 'static + Serialize,
+        M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + 
Default,
     {
+        let is_json = false;
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+        ) = get_codec(is_json);
         let req = req.map(|m| stream::once(future::ready(m)));
         let body_stream = encode(
-            codec.encoder(),
+            encoder,
             req.into_inner().map(Ok),
             self.send_compression_encoding,
+            is_json,
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
@@ -169,7 +180,7 @@ impl TripleClient {
         match response {
             Ok(v) => {
                 let resp = v.map(|body| {
-                    Decoding::new(body, codec.decoder(), 
self.send_compression_encoding)
+                    Decoding::new(body, decoder, 
self.send_compression_encoding, is_json)
                 });
                 let (mut parts, body) = Response::from_http(resp).into_parts();
 
@@ -194,23 +205,27 @@ impl TripleClient {
         }
     }
 
-    pub async fn bidi_streaming<C, M1, M2>(
+    pub async fn bidi_streaming<M1, M2>(
         &mut self,
         req: impl IntoStreamingRequest<Message = M1>,
-        mut codec: C,
         path: http::uri::PathAndQuery,
         invocation: RpcInvocation,
     ) -> Result<Response<Decoding<M2>>, crate::status::Status>
     where
-        C: Codec<Encode = M1, Decode = M2>,
-        M1: Send + Sync + 'static,
-        M2: Send + Sync + 'static,
+        M1: Message + Send + Sync + 'static + Serialize,
+        M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + 
Default,
     {
+        let is_json = false;
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+        ) = get_codec(is_json);
         let req = req.into_streaming_request();
         let en = encode(
-            codec.encoder(),
+            encoder,
             req.into_inner().map(Ok),
             self.send_compression_encoding,
+            is_json,
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
@@ -237,7 +252,7 @@ impl TripleClient {
         match response {
             Ok(v) => {
                 let resp = v.map(|body| {
-                    Decoding::new(body, codec.decoder(), 
self.send_compression_encoding)
+                    Decoding::new(body, decoder, 
self.send_compression_encoding, is_json)
                 });
 
                 Ok(Response::from_http(resp))
@@ -246,23 +261,27 @@ impl TripleClient {
         }
     }
 
-    pub async fn client_streaming<C, M1, M2>(
+    pub async fn client_streaming<M1, M2>(
         &mut self,
         req: impl IntoStreamingRequest<Message = M1>,
-        mut codec: C,
         path: http::uri::PathAndQuery,
         invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
-        C: Codec<Encode = M1, Decode = M2>,
-        M1: Send + Sync + 'static,
-        M2: Send + Sync + 'static,
+        M1: Message + Send + Sync + 'static + Serialize,
+        M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + 
Default,
     {
+        let is_json = false;
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+        ) = get_codec(is_json);
         let req = req.into_streaming_request();
         let en = encode(
-            codec.encoder(),
+            encoder,
             req.into_inner().map(Ok),
             self.send_compression_encoding,
+            is_json,
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
@@ -290,7 +309,7 @@ impl TripleClient {
         match response {
             Ok(v) => {
                 let resp = v.map(|body| {
-                    Decoding::new(body, codec.decoder(), 
self.send_compression_encoding)
+                    Decoding::new(body, decoder, 
self.send_compression_encoding, is_json)
                 });
                 let (mut parts, body) = Response::from_http(resp).into_parts();
 
@@ -315,23 +334,27 @@ impl TripleClient {
         }
     }
 
-    pub async fn server_streaming<C, M1, M2>(
+    pub async fn server_streaming<M1, M2>(
         &mut self,
         req: Request<M1>,
-        mut codec: C,
         path: http::uri::PathAndQuery,
         invocation: RpcInvocation,
     ) -> Result<Response<Decoding<M2>>, crate::status::Status>
     where
-        C: Codec<Encode = M1, Decode = M2>,
-        M1: Send + Sync + 'static,
-        M2: Send + Sync + 'static,
+        M1: Message + Send + Sync + 'static + Serialize,
+        M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + 
Default,
     {
+        let is_json = false;
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+        ) = get_codec(is_json);
         let req = req.map(|m| stream::once(future::ready(m)));
         let en = encode(
-            codec.encoder(),
+            encoder,
             req.into_inner().map(Ok),
             self.send_compression_encoding,
+            is_json,
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
@@ -357,7 +380,7 @@ impl TripleClient {
         match response {
             Ok(v) => {
                 let resp = v.map(|body| {
-                    Decoding::new(body, codec.decoder(), 
self.send_compression_encoding)
+                    Decoding::new(body, decoder, 
self.send_compression_encoding, is_json)
                 });
 
                 Ok(Response::from_http(resp))
@@ -366,3 +389,25 @@ impl TripleClient {
         }
     }
 }
+
+pub fn get_codec<M1, M2>(
+    is_json: bool,
+) -> (
+    Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
+    Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
+)
+where
+    M1: Message + Send + Sync + 'static + Serialize,
+    M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
+{
+    match is_json {
+        true => {
+            let mut codec = SerdeCodec::<M1, M2>::default();
+            (Box::new(codec.decoder()), Box::new(codec.encoder()))
+        }
+        false => {
+            let mut codec = ProstCodec::<M1, M2>::default();
+            (Box::new(codec.decoder()), Box::new(codec.encoder()))
+        }
+    }
+}
diff --git a/dubbo/src/triple/decode.rs b/dubbo/src/triple/decode.rs
index 07c1160..875bbea 100644
--- a/dubbo/src/triple/decode.rs
+++ b/dubbo/src/triple/decode.rs
@@ -38,21 +38,27 @@ pub struct Decoding<T> {
     trailers: Option<Metadata>,
     compress: Option<CompressionEncoding>,
     decompress_buf: BytesMut,
+    is_json: bool,
 }
 
 #[derive(PartialEq)]
 enum State {
     ReadHeader,
+    ReadJSON,
     ReadBody { len: usize, is_compressed: bool },
     Error,
 }
 
 impl<T> Decoding<T> {
-    pub fn new<B, D>(body: B, decoder: D, compress: 
Option<CompressionEncoding>) -> Self
+    pub fn new<B>(
+        body: B,
+        decoder: Box<dyn Decoder<Item = T, Error = crate::status::Status> + 
Send + 'static>,
+        compress: Option<CompressionEncoding>,
+        is_json: bool,
+    ) -> Self
     where
         B: Body + Send + 'static,
         B::Error: Into<crate::Error>,
-        D: Decoder<Item = T, Error = crate::status::Status> + Send + 'static,
     {
         Self {
             state: State::ReadHeader,
@@ -65,11 +71,12 @@ impl<T> Decoding<T> {
                     )
                 })
                 .boxed_unsync(),
-            decoder: Box::new(decoder),
+            decoder,
             buf: BytesMut::with_capacity(super::consts::BUFFER_SIZE),
             trailers: None,
             compress,
             decompress_buf: BytesMut::new(),
+            is_json,
         }
     }
 
@@ -91,7 +98,47 @@ impl<T> Decoding<T> {
         trailer.map(|data| data.map(Metadata::from_headers))
     }
 
-    pub fn decode_chunk(&mut self) -> Result<Option<T>, crate::status::Status> 
{
+    pub fn decode_json(&mut self) -> Result<Option<T>, crate::status::Status> {
+        if self.state == State::ReadHeader {
+            self.state = State::ReadJSON;
+            return Ok(None);
+        }
+        if let State::ReadJSON = self.state {
+            if self.buf.is_empty() {
+                return Ok(None);
+            }
+            match self.compress {
+                None => self.decompress_buf = self.buf.clone(),
+                Some(compress) => {
+                    let len = self.buf.len();
+                    if let Err(err) =
+                        decompress(compress, &mut self.buf, &mut 
self.decompress_buf, len)
+                    {
+                        return Err(crate::status::Status::new(
+                            crate::status::Code::Internal,
+                            err.to_string(),
+                        ));
+                    }
+                }
+            }
+            let len = self.decompress_buf.len();
+            let decoding_result = self
+                .decoder
+                .decode(&mut DecodeBuf::new(&mut self.decompress_buf, len));
+
+            return match decoding_result {
+                Ok(Some(r)) => {
+                    self.state = State::ReadHeader;
+                    Ok(Some(r))
+                }
+                Ok(None) => Ok(None),
+                Err(err) => Err(err),
+            };
+        }
+        Ok(None)
+    }
+
+    pub fn decode_proto(&mut self) -> Result<Option<T>, crate::status::Status> 
{
         if self.state == State::ReadHeader {
             // buffer is full
             if self.buf.remaining() < super::consts::HEADER_SIZE {
@@ -166,6 +213,14 @@ impl<T> Decoding<T> {
 
         Ok(None)
     }
+
+    pub fn decode_chunk(&mut self) -> Result<Option<T>, crate::status::Status> 
{
+        if self.is_json {
+            self.decode_json()
+        } else {
+            self.decode_proto()
+        }
+    }
 }
 
 impl<T> Stream for Decoding<T> {
diff --git a/dubbo/src/triple/encode.rs b/dubbo/src/triple/encode.rs
index b837463..1c971d6 100644
--- a/dubbo/src/triple/encode.rs
+++ b/dubbo/src/triple/encode.rs
@@ -28,13 +28,13 @@ use crate::triple::codec::{EncodeBuf, Encoder};
 
 #[allow(unused_must_use)]
 pub fn encode<E, B>(
-    mut encoder: E,
+    mut encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
     resp_body: B,
     compression_encoding: Option<CompressionEncoding>,
+    is_json: bool,
 ) -> impl TryStream<Ok = Bytes, Error = Status>
 where
-    E: Encoder<Error = Status>,
-    B: Stream<Item = Result<E::Item, Status>>,
+    B: Stream<Item = Result<E, Status>>,
 {
     async_stream::stream! {
         let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
@@ -48,12 +48,13 @@ where
         loop {
             match resp_body.next().await {
                 Some(Ok(item)) => {
-                    // 编码数据到缓冲中
-                    buf.reserve(super::consts::HEADER_SIZE);
+                    if !is_json {
+                        buf.reserve(super::consts::HEADER_SIZE);
                     unsafe {
                         buf.advance_mut(super::consts::HEADER_SIZE);
+                        }
                     }
-
+                    // 编码数据到缓冲中
                     if enable_compress {
                         uncompression_buf.clear();
 
@@ -66,16 +67,21 @@ where
                     } else {
                         encoder.encode(item, &mut EncodeBuf::new(&mut 
buf)).map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, 
"encode error".to_string()));
                     }
-
-
-                    let len = buf.len() - super::consts::HEADER_SIZE;
-                    {
+                    let result=match is_json{
+                        true=>{
+                            buf.clone()
+                        }
+                        false=>{
+                            let len = buf.len() - super::consts::HEADER_SIZE;
+                        {
                         let mut buf = &mut buf[..super::consts::HEADER_SIZE];
                         buf.put_u8(enable_compress as u8);
                         buf.put_u32(len as u32);
-                    }
-
-                    yield Ok(buf.split_to(len + 
super::consts::HEADER_SIZE).freeze());
+                        }
+                        buf.split_to(len + super::consts::HEADER_SIZE)
+                        }
+                    };
+                    yield Ok(result.freeze());
                 },
                 Some(Err(err)) => yield Err(err.into()),
                 None => break,
@@ -85,28 +91,28 @@ where
 }
 
 pub fn encode_server<E, B>(
-    encoder: E,
+    encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
     body: B,
     compression_encoding: Option<CompressionEncoding>,
+    is_json: bool,
 ) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
 where
-    E: Encoder<Error = Status>,
-    B: Stream<Item = Result<E::Item, Status>>,
+    B: Stream<Item = Result<E, Status>>,
 {
-    let s = encode(encoder, body, compression_encoding).into_stream();
+    let s = encode(encoder, body, compression_encoding, is_json).into_stream();
     EncodeBody::new_server(s)
 }
 
 pub fn encode_client<E, B>(
-    encoder: E,
+    encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
     body: B,
     compression_encoding: Option<CompressionEncoding>,
+    is_json: bool,
 ) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
 where
-    E: Encoder<Error = Status>,
-    B: Stream<Item = E::Item>,
+    B: Stream<Item = E>,
 {
-    let s = encode(encoder, body.map(Ok), compression_encoding).into_stream();
+    let s = encode(encoder, body.map(Ok), compression_encoding, 
is_json).into_stream();
     EncodeBody::new_client(s)
 }
 
@@ -115,6 +121,7 @@ enum Role {
     Server,
     Client,
 }
+
 #[pin_project]
 pub struct EncodeBody<S> {
     #[pin]
diff --git a/dubbo/src/triple/server/triple.rs 
b/dubbo/src/triple/server/triple.rs
index 2c0626c..2586e64 100644
--- a/dubbo/src/triple/server/triple.rs
+++ b/dubbo/src/triple/server/triple.rs
@@ -16,12 +16,18 @@
  */
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
+use http::HeaderValue;
 use http_body::Body;
+use prost::Message;
+use serde::{Deserialize, Serialize};
+use std::marker::PhantomData;
 
 use crate::{
     invocation::Request,
+    status::Status,
     triple::{
-        codec::Codec,
+        client::triple::get_codec,
+        codec::{Decoder, Encoder},
         compression::{CompressionEncoding, COMPRESSIONS},
         decode::Decoding,
         encode::encode_server,
@@ -34,23 +40,24 @@ use dubbo_config::BusinessConfig;
 pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
 pub const GRPC_ENCODING: &str = "grpc-encoding";
 
-pub struct TripleServer<T> {
-    codec: T,
+pub struct TripleServer<M1, M2> {
+    _pd: PhantomData<(M1, M2)>,
     compression: Option<CompressionEncoding>,
 }
 
-impl<T> TripleServer<T> {
-    pub fn new(codec: T) -> Self {
+impl<M1, M2> TripleServer<M1, M2> {
+    pub fn new() -> Self {
         Self {
-            codec,
-            compression: None,
+            _pd: PhantomData,
+            compression: Some(CompressionEncoding::Gzip),
         }
     }
 }
 
-impl<T> TripleServer<T>
+impl<M1, M2> TripleServer<M1, M2>
 where
-    T: Codec,
+    M1: Message + for<'a> Deserialize<'a> + Default + 'static,
+    M2: Message + Serialize + Default + 'static,
 {
     pub async fn client_streaming<S, B>(
         &mut self,
@@ -58,10 +65,20 @@ where
         req: http::Request<B>,
     ) -> http::Response<BoxBody>
     where
-        S: ClientStreamingSvc<T::Decode, Response = T::Encode>,
+        S: ClientStreamingSvc<M1, Response = M2>,
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
+        let content_type = req
+            .headers()
+            .get("content-type")
+            .cloned()
+            .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+        let is_json = content_type == "application/json" || content_type == 
"application/grpc+json";
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+        ) = get_codec(is_json);
         let mut accept_encoding = 
CompressionEncoding::from_accept_encoding(req.headers());
         if self.compression.is_none() || accept_encoding.is_none() {
             accept_encoding = None;
@@ -73,7 +90,7 @@ where
             Err(status) => return status.to_http(),
         };
 
-        let req_stream = req.map(|body| Decoding::new(body, 
self.codec.decoder(), compression));
+        let req_stream = req.map(|body| Decoding::new(body, decoder, 
compression, is_json));
 
         let resp = service.call(Request::from_http(req_stream)).await;
 
@@ -83,15 +100,15 @@ where
         };
 
         let resp_body = encode_server(
-            self.codec.encoder(),
+            encoder,
             stream::once(future::ready(resp_body)).map(Ok).into_stream(),
             accept_encoding,
+            is_json,
         );
 
-        parts.headers.insert(
-            http::header::CONTENT_TYPE,
-            http::HeaderValue::from_static("application/grpc"),
-        );
+        parts
+            .headers
+            .insert(http::header::CONTENT_TYPE, content_type);
         if let Some(encoding) = accept_encoding {
             parts
                 .headers
@@ -107,11 +124,22 @@ where
         req: http::Request<B>,
     ) -> http::Response<BoxBody>
     where
-        S: StreamingSvc<T::Decode, Response = T::Encode>,
+        S: StreamingSvc<M1, Response = M2>,
         S::ResponseStream: Send + 'static,
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
+        let content_type = req
+            .headers()
+            .get("content-type")
+            .cloned()
+            .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+        let is_json = content_type == "application/json" || content_type == 
"application/grpc+json";
+
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+        ) = get_codec(is_json);
         // Firstly, get grpc_accept_encoding from http_header, get compression
         // Secondly, if server enable compression and compression is valid, 
this method should compress response
         let mut accept_encoding = 
CompressionEncoding::from_accept_encoding(req.headers());
@@ -125,7 +153,7 @@ where
             Err(status) => return status.to_http(),
         };
 
-        let req_stream = req.map(|body| Decoding::new(body, 
self.codec.decoder(), compression));
+        let req_stream = req.map(|body| Decoding::new(body, decoder, 
compression, is_json));
 
         let resp = service.call(Request::from_http(req_stream)).await;
 
@@ -133,12 +161,11 @@ where
             Ok(v) => v.into_http().into_parts(),
             Err(err) => return err.to_http(),
         };
-        let resp_body = encode_server(self.codec.encoder(), resp_body, 
compression);
+        let resp_body = encode_server(encoder, resp_body, compression, 
is_json);
 
-        parts.headers.insert(
-            http::header::CONTENT_TYPE,
-            http::HeaderValue::from_static("application/grpc"),
-        );
+        parts
+            .headers
+            .insert(http::header::CONTENT_TYPE, content_type);
         if let Some(encoding) = accept_encoding {
             parts
                 .headers
@@ -154,11 +181,22 @@ where
         req: http::Request<B>,
     ) -> http::Response<BoxBody>
     where
-        S: ServerStreamingSvc<T::Decode, Response = T::Encode>,
+        S: ServerStreamingSvc<M1, Response = M2>,
         S::ResponseStream: Send + 'static,
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
+        let content_type = req
+            .headers()
+            .get("content-type")
+            .cloned()
+            .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+        let is_json = content_type == "application/json" || content_type == 
"application/grpc+json";
+
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+        ) = get_codec(is_json);
         // Firstly, get grpc_accept_encoding from http_header, get compression
         // Secondly, if server enable compression and compression is valid, 
this method should compress response
         let mut accept_encoding = 
CompressionEncoding::from_accept_encoding(req.headers());
@@ -171,8 +209,7 @@ where
             Ok(val) => val,
             Err(status) => return status.to_http(),
         };
-
-        let req_stream = req.map(|body| Decoding::new(body, 
self.codec.decoder(), compression));
+        let req_stream = req.map(|body| Decoding::new(body, decoder, 
compression, is_json));
         let (parts, mut body) = Request::from_http(req_stream).into_parts();
         let msg = body.try_next().await.unwrap().ok_or_else(|| {
             crate::status::Status::new(crate::status::Code::Unknown, "request 
wrong".to_string())
@@ -188,12 +225,11 @@ where
             Ok(v) => v.into_http().into_parts(),
             Err(err) => return err.to_http(),
         };
-        let resp_body = encode_server(self.codec.encoder(), resp_body, 
compression);
+        let resp_body = encode_server(encoder, resp_body, compression, 
is_json);
 
-        parts.headers.insert(
-            http::header::CONTENT_TYPE,
-            http::HeaderValue::from_static("application/grpc"),
-        );
+        parts
+            .headers
+            .insert(http::header::CONTENT_TYPE, content_type);
         if let Some(encoding) = accept_encoding {
             parts
                 .headers
@@ -209,7 +245,7 @@ where
         req: http::Request<B>,
     ) -> http::Response<BoxBody>
     where
-        S: UnarySvc<T::Decode, Response = T::Encode>,
+        S: UnarySvc<M1, Response = M2>,
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
@@ -222,8 +258,18 @@ where
             Ok(val) => val,
             Err(status) => return status.to_http(),
         };
-
-        let req_stream = req.map(|body| Decoding::new(body, 
self.codec.decoder(), compression));
+        let content_type = req
+            .headers()
+            .get("content-type")
+            .cloned()
+            .unwrap_or(HeaderValue::from_str("application/json").unwrap());
+        let is_json = content_type == "application/json" || content_type == 
"application/grpc+json";
+
+        let (decoder, encoder): (
+            Box<dyn Decoder<Item = M1, Error = Status> + Send + 'static>,
+            Box<dyn Encoder<Error = Status, Item = M2> + Send + 'static>,
+        ) = get_codec(is_json);
+        let req_stream = req.map(|body| Decoding::new(body, decoder, 
compression, is_json));
         let (parts, mut body) = Request::from_http(req_stream).into_parts();
         let msg = body.try_next().await.unwrap().ok_or_else(|| {
             crate::status::Status::new(crate::status::Code::Unknown, "request 
wrong".to_string())
@@ -240,15 +286,15 @@ where
             Err(err) => return err.to_http(),
         };
         let resp_body = encode_server(
-            self.codec.encoder(),
+            encoder,
             stream::once(future::ready(resp_body)).map(Ok).into_stream(),
             accept_encoding,
+            is_json,
         );
 
-        parts.headers.insert(
-            http::header::CONTENT_TYPE,
-            http::HeaderValue::from_static("application/grpc"),
-        );
+        parts
+            .headers
+            .insert(http::header::CONTENT_TYPE, content_type);
         if let Some(encoding) = accept_encoding {
             parts
                 .headers
@@ -282,7 +328,7 @@ where
     }
 }
 
-impl<T> BusinessConfig for TripleServer<T> {
+impl<M1, M2> BusinessConfig for TripleServer<M1, M2> {
     fn init() -> Self {
         todo!()
     }
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index bc6638b..81df8ed 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -24,8 +24,11 @@ http = "0.2"
 http-body = "0.4.4"
 futures-util = {version = "0.3", default-features = false}
 tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", 
"macros", "net", "signal"] }
-prost-derive = {version = "0.10", optional = true}
-prost = "0.10.4"
+prost-derive = {version = "0.11.9", optional = true}
+prost = "0.11.9"
+prost-serde = "0.3.0"
+prost-serde-derive = "0.1.2"
+serde = { version = "1.0.171",features = ["derive"]  }
 async-trait = "0.1.56"
 tokio-stream = "0.1"
 dubbo-logger.workspace=true
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 0a2f150..1a21000 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -71,7 +71,7 @@ async fn main() {
     };
     let (_parts, resp_body) = client_streaming_resp.into_parts();
     println!("client streaming, Response: {:?}", resp_body);
-
+    //
     let data = vec![
         EchoRequest {
             message: "msg1 from client".to_string(),
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs 
b/examples/echo/src/generated/grpc.examples.echo.rs
index 16fb163..0a74655 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -1,12 +1,16 @@
-// @generated by apache/dubbo-rust.
-
 /// EchoRequest is the request for echo.
+#[derive(serde::Serialize, serde::Deserialize)]
+#[serde(default)]
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct EchoRequest {
     #[prost(string, tag = "1")]
     pub message: ::prost::alloc::string::String,
 }
 /// EchoResponse is the response for echo.
+#[derive(serde::Serialize, serde::Deserialize)]
+#[serde(default)]
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct EchoResponse {
     #[prost(string, tag = "1")]
@@ -31,73 +35,55 @@ pub mod echo_client {
                 inner: TripleClient::new(builder),
             }
         }
-        pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
-            self.inner = self.inner.with_cluster(invoker);
-            self
-        }
         /// UnaryEcho is unary echo.
         pub async fn unary_echo(
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec =
-                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("UnaryEcho"));
             let path = 
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
-            self.inner.unary(request, codec, path, invocation).await
+            self.inner.unary(request, path, invocation).await
         }
         /// ServerStreamingEcho is server side streaming.
         pub async fn server_streaming_echo(
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
-            let codec =
-                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ServerStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
             );
-            self.inner
-                .server_streaming(request, codec, path, invocation)
-                .await
+            self.inner.server_streaming(request, path, invocation).await
         }
         /// ClientStreamingEcho is client side streaming.
         pub async fn client_streaming_echo(
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec =
-                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ClientStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
             );
-            self.inner
-                .client_streaming(request, codec, path, invocation)
-                .await
+            self.inner.client_streaming(request, path, invocation).await
         }
         /// BidirectionalStreamingEcho is bidi streaming.
         pub async fn bidirectional_streaming_echo(
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
-            let codec =
-                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("BidirectionalStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
             );
-            self.inner
-                .bidi_streaming(request, codec, path, invocation)
-                .await
+            self.inner.bidi_streaming(request, path, invocation).await
         }
     }
 }
@@ -186,10 +172,8 @@ pub mod echo_server {
                         }
                     }
                     let fut = async move {
-                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
-                            super::EchoResponse,
-                            super::EchoRequest,
-                        >::default());
+                        let mut server =
+                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
                         let res = server.unary(UnaryEchoServer { inner }, 
req).await;
                         Ok(res)
                     };
@@ -212,10 +196,8 @@ pub mod echo_server {
                         }
                     }
                     let fut = async move {
-                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
-                            super::EchoResponse,
-                            super::EchoRequest,
-                        >::default());
+                        let mut server =
+                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
                         let res = server
                             .server_streaming(ServerStreamingEchoServer { 
inner }, req)
                             .await;
@@ -241,10 +223,8 @@ pub mod echo_server {
                         }
                     }
                     let fut = async move {
-                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
-                            super::EchoResponse,
-                            super::EchoRequest,
-                        >::default());
+                        let mut server =
+                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
                         let res = server
                             .client_streaming(ClientStreamingEchoServer { 
inner }, req)
                             .await;
@@ -273,10 +253,8 @@ pub mod echo_server {
                         }
                     }
                     let fut = async move {
-                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
-                            super::EchoResponse,
-                            super::EchoRequest,
-                        >::default());
+                        let mut server =
+                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
                         let res = server
                             .bidi_streaming(BidirectionalStreamingEchoServer { 
inner }, req)
                             .await;
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index d68cab7..37faf91 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -24,8 +24,11 @@ http = "0.2"
 http-body = "0.4.4"
 futures-util = { version = "0.3", default-features = false }
 tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", 
"macros", "net", "signal"] }
-prost-derive = { version = "0.10", optional = true }
-prost = "0.10.4"
+prost-derive = { version = "0.11.9", optional = true }
+prost = "0.11.9"
+serde = { version = "1.0.171",features = ["derive"]  }
+prost-serde = "0.3.0"
+prost-serde-derive = "0.1.2"
 async-trait = "0.1.56"
 tokio-stream = "0.1"
 dubbo-logger = { path = "../../common/logger" }
diff --git a/examples/greeter/src/greeter/server.rs 
b/examples/greeter/src/greeter/server.rs
index 94e4e53..f143e25 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -76,7 +76,7 @@ impl Greeter for GreeterServerImpl {
         request: Request<GreeterRequest>,
     ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
         info!("GreeterServer::greet {:?}", request.metadata);
-
+        println!("{:?}", request.into_inner());
         Ok(Response::new(GreeterReply {
             message: "hello, dubbo-rust".to_string(),
         }))

Reply via email to