This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch poc-idl in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit a44357d0aad89f04bc9937c190d954a6e070d8a4 Author: johankoi <[email protected]> AuthorDate: Thu Jul 14 04:25:27 2022 +0800 feat: gen rpc code form proto service defined Signed-off-by: johankoi <[email protected]> --- Cargo.toml | 6 +- dubbo-build/Cargo.toml | 12 ++ dubbo-build/src/generator.rs | 136 +++++++++++++++++++++ dubbo-build/src/lib.rs | 8 ++ examples/grpc-gen/Cargo.toml | 20 +++ examples/grpc-gen/build.rs | 12 ++ .../pb_message => grpc-gen}/pb/greeter.proto | 0 examples/grpc-gen/src/greeter.rs | 73 +++++++++++ examples/grpc-gen/src/main.rs | 12 ++ examples/protobuf-transport/Cargo.toml | 30 +++++ examples/protobuf-transport/build.rs | 9 ++ .../pb/person.proto | 0 examples/protobuf-transport/src/client.rs | 33 +++++ examples/protobuf-transport/src/lib.rs | 7 ++ .../src/pb => protobuf-transport/src}/person.rs | 0 .../main.rs => protobuf-transport/src/server.rs} | 2 +- examples/protobuf/client/Cargo.toml | 14 --- examples/protobuf/client/src/main.rs | 32 ----- examples/protobuf/pb_message/Cargo.toml | 13 -- examples/protobuf/pb_message/build.rs | 18 --- examples/protobuf/pb_message/src/lib.rs | 37 ------ examples/protobuf/pb_message/src/pb/greeter.rs | 10 -- examples/protobuf/pb_message/src/pb/mod.rs | 5 - examples/protobuf/server/Cargo.toml | 10 -- xds/Cargo.toml | 3 + xds/src/error.rs | 136 +++++++++++++++++++++ xds/src/lib.rs | 17 +++ xds/src/request.rs | 131 ++++++++++++++++++++ xds/src/response.rs | 131 ++++++++++++++++++++ xds/src/util.rs | 9 ++ xds/src/wrapper.rs | 110 +++++++++++++++++ 31 files changed, 893 insertions(+), 143 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0fe1fd2..3016158 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,12 @@ [workspace] members = [ "xds", + "dubbo-build", "protocol", "registry", "metadata", "common", "config", - "examples/protobuf/client", - "examples/protobuf/server", - "examples/protobuf/pb_message", + "examples/grpc-gen", + "examples/protobuf-transport", ] \ No newline at end of file diff --git a/dubbo-build/Cargo.toml b/dubbo-build/Cargo.toml new file mode 100644 index 0000000..5839555 --- /dev/null +++ b/dubbo-build/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "dubbo-build" +version = "0.1.0" +edition = "2021" + +[features] +generator-code = ["prost-build"] + + + +[dependencies] +prost-build = { version = "0.10.4", optional = true } \ No newline at end of file diff --git a/dubbo-build/src/generator.rs b/dubbo-build/src/generator.rs new file mode 100644 index 0000000..05e25cf --- /dev/null +++ b/dubbo-build/src/generator.rs @@ -0,0 +1,136 @@ +use prost_build::{Method, Service, ServiceGenerator}; + +#[derive(Default)] +pub struct CodeGenerator {} + +impl CodeGenerator { + pub fn new() -> CodeGenerator { Default::default() } + + fn generate_module_name(&self) -> &str { "xds" } + + fn generate_type_aliases(&mut self, buf: &mut String) { + buf.push_str(&format!( + "\n\ + pub type DBReq<I> = {0}::request::ServiceRequest<I>;\n\ + pub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n", + self.generate_module_name())); + } + + fn generate_main_trait(&self, service: &Service, buf: &mut String) { + buf.push_str("\nuse async_trait::async_trait;\n\n#[async_trait]\n"); + + service.comments.append_with_indent(0, buf); + buf.push_str(&format!("pub trait {} {{", service.name)); + for method in service.methods.iter() { + buf.push_str("\n"); + method.comments.append_with_indent(1, buf); + buf.push_str(&format!(" {};\n", self.method_sig(method))); + } + buf.push_str("}\n"); + } + + fn method_sig(&self, method: &Method) -> String { + format!("async fn {0}(&self, request: DBReq<{1}>) -> DBResp<{2}>", + method.name, method.input_type, method.output_type) + } + + fn generate_client_struct(&self, service: &Service, buf: &mut String) { + buf.push_str(&format!( + "\n\ + pub struct {0}Client {{\n \ + pub hyper_client: {1}::wrapper::HyperClient \n\ + }}\n",service.name, self.generate_module_name())); + } + + fn generate_client_impl(&self, service: &Service, buf: &mut String) { + buf.push_str(&format!( + "\n\ + impl {0}Client {{\n \ + pub fn new(root_url: &str) -> {0}Client {{\n \ + {0}Client {{\n \ + hyper_client: {1}::wrapper::HyperClient::new(root_url) \n \ + }}\n \ + }}\n\ + }}\n", service.name, self.generate_module_name())); + + buf.push_str("\n#[async_trait]"); + buf.push_str(&format!("\nimpl {0} for {0}Client {{", service.name)); + + for method in service.methods.iter() { + buf.push_str(&format!( + "\n {} {{\n \ + self.hyper_client.request(\"/dubbo/{}.{}/{}\", request).await\n \ + }}\n", self.method_sig(method), service.package, service.proto_name, method.proto_name)); + } + buf.push_str("}\n"); + } + + fn generate_server_struct(&self, service: &Service, buf: &mut String) { + buf.push_str(&format!( + "\n\ + pub struct {0}Server<T: 'static + {0} + Send + Sync + Clone> {{\n \ + pub hyper_server: {1}::wrapper::HyperServer<T> \n\ + }}\n",service.name, self.generate_module_name())); + } + + fn generate_server_impl(&self, service: &Service, buf: &mut String) { + buf.push_str(&format!( + "\n\ + impl<T: 'static + {0} + Send + Sync + Clone> {0}Server<T> {{\n \ + pub fn new(&self, service: T) -> {0}Server<T> {{\n \ + {0}Server {{\n \ + hyper_server: {1}::wrapper::HyperServer::new(service) \n \ + }}\n \ + }}\n\ + }}\n", + service.name, self.generate_module_name())); + + buf.push_str(&format!( + "\n\ + impl<T: 'static + {0} + Send + Sync + Clone> {1}::wrapper::HyperService for {0}Server<T> {{\n \ + fn handle(&self, req: DBReq<Vec<u8>>) -> {1}::BoxFutureResp<Vec<u8>> {{\n \ + use ::futures::Future;\n \ + let trait_object_service = self.hyper_server.service.clone();\n \ + match (req.method.clone(), req.uri.path()) {{", + service.name, self.generate_module_name())); + + // Make match arms for each type + for method in service.methods.iter() { + buf.push_str(&format!( + "\n \ + (::hyper::Method::POST, \"/dubbo/{0}.{1}/{2}\") => {{\n \ + Box::pin(async move {{ \n \ + let proto_req = req.to_proto().unwrap(); \n \ + let resp = trait_object_service.{3}(proto_req).await.unwrap(); \n \ + let proto_resp = resp.to_proto_raw(); \n \ + proto_resp \n \ + }}) \n \ + }},", service.package, service.proto_name, method.proto_name, method.name)); + } + // Final 404 arm and end fn + buf.push_str(&format!( + "\n \ + _ => {{\n \ + Box::pin(async move {{ \n \ + Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_resp_raw()) \n \ + }}) \n \ + }}\n \ + }}\n \ + }}\n\ + }}", self.generate_module_name())); + } +} + +impl ServiceGenerator for CodeGenerator { + fn generate(&mut self, service: Service, buf: &mut String) { + self.generate_type_aliases(buf); + self.generate_main_trait(&service, buf); + self.generate_client_struct(&service, buf); + self.generate_client_impl(&service, buf); + self.generate_server_struct(&service, buf); + self.generate_server_impl(&service, buf); + } + + fn finalize(&mut self, buf: &mut String) { + } +} diff --git a/dubbo-build/src/lib.rs b/dubbo-build/src/lib.rs new file mode 100644 index 0000000..329eb39 --- /dev/null +++ b/dubbo-build/src/lib.rs @@ -0,0 +1,8 @@ + + +#[cfg(feature = "generator-code")] +extern crate prost_build; +#[cfg(feature = "generator-code")] +mod generator; +#[cfg(feature = "generator-code")] +pub use generator::CodeGenerator; diff --git a/examples/grpc-gen/Cargo.toml b/examples/grpc-gen/Cargo.toml new file mode 100644 index 0000000..335c81c --- /dev/null +++ b/examples/grpc-gen/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "grpc-gen" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3.21" +prost = "0.10.4" +async-trait = { version = "0.1.56" } +xds = { version = "0.1.0", path = "../../xds" } + +hyper = { version = "0.14.19", features = ["full"] } + + + +[build-dependencies] +prost-build = "0.10.4" +dubbo-build = { path = "../../dubbo-build", features = ["generator-code"] } \ No newline at end of file diff --git a/examples/grpc-gen/build.rs b/examples/grpc-gen/build.rs new file mode 100644 index 0000000..0098c18 --- /dev/null +++ b/examples/grpc-gen/build.rs @@ -0,0 +1,12 @@ +use std::io::Result; +use prost_build::Config; + +fn main() -> Result<()> { + let mut conf = Config::new(); + let mut gen = dubbo_build::CodeGenerator::new(); + conf.service_generator(Box::new(gen)); + conf.out_dir("src/"); + conf.compile_protos(&["pb/greeter.proto"], &["pb/"]).unwrap(); + + Ok(()) +} \ No newline at end of file diff --git a/examples/protobuf/pb_message/pb/greeter.proto b/examples/grpc-gen/pb/greeter.proto similarity index 100% rename from examples/protobuf/pb_message/pb/greeter.proto rename to examples/grpc-gen/pb/greeter.proto diff --git a/examples/grpc-gen/src/greeter.rs b/examples/grpc-gen/src/greeter.rs new file mode 100644 index 0000000..3ecb8f2 --- /dev/null +++ b/examples/grpc-gen/src/greeter.rs @@ -0,0 +1,73 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloRequest { + #[prost(string, tag="1")] + pub name: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloResponse { + #[prost(string, tag="1")] + pub message: ::prost::alloc::string::String, +} + +pub type DBReq<I> = xds::request::ServiceRequest<I>; +pub type DBResp<O> = Result<xds::response::ServiceResponse<O>, xds::error::DBProstError>; + +use async_trait::async_trait; + +#[async_trait] +pub trait Greeter { + async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse>; +} + +pub struct GreeterClient { + pub hyper_client: xds::wrapper::HyperClient +} + +impl GreeterClient { + pub fn new(root_url: &str) -> GreeterClient { + GreeterClient { + hyper_client: xds::wrapper::HyperClient::new(root_url) + } + } +} + +#[async_trait] +impl Greeter for GreeterClient { + async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse> { + self.hyper_client.request("/dubbo/greeter.Greeter/SayHello", request).await + } +} + +pub struct GreeterServer<T: 'static + Greeter + Send + Sync + Clone> { + pub hyper_server: xds::wrapper::HyperServer<T> +} + +impl<T: 'static + Greeter + Send + Sync + Clone> GreeterServer<T> { + pub fn new(&self, service: T) -> GreeterServer<T> { + GreeterServer { + hyper_server: xds::wrapper::HyperServer::new(service) + } + } +} + +impl<T: 'static + Greeter + Send + Sync + Clone> xds::wrapper::HyperService for GreeterServer<T> { + fn handle(&self, req: DBReq<Vec<u8>>) -> xds::BoxFutureResp<Vec<u8>> { + use ::futures::Future; + let trait_object_service = self.hyper_server.service.clone(); + match (req.method.clone(), req.uri.path()) { + (::hyper::Method::POST, "/dubbo/greeter.Greeter/SayHello") => { + Box::pin(async move { + let proto_req = req.to_proto().unwrap(); + let resp = trait_object_service.say_hello(proto_req).await.unwrap(); + let proto_resp = resp.to_proto_raw(); + proto_resp + }) + }, + _ => { + Box::pin(async move { + Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_resp_raw()) + }) + } + } + } +} \ No newline at end of file diff --git a/examples/grpc-gen/src/main.rs b/examples/grpc-gen/src/main.rs new file mode 100644 index 0000000..68b341a --- /dev/null +++ b/examples/grpc-gen/src/main.rs @@ -0,0 +1,12 @@ + +mod greeter; +use greeter::*; + + +fn main() { + + let client = GreeterClient::new("0.0.0.0:8080"); + let hello_req = DBReq::new(HelloRequest { name: "johankoi".into()}); + client.say_hello(hello_req); + +} diff --git a/examples/protobuf-transport/Cargo.toml b/examples/protobuf-transport/Cargo.toml new file mode 100644 index 0000000..8a35f90 --- /dev/null +++ b/examples/protobuf-transport/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "protobuf-transport" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +xds = { version = "0.1.0", path = "../../xds" } +tokio = {version = "1.9.0", features = ["full"]} +hyper = { version = "0.14", features = ["full"] } +futures = "0.3.21" +prost = "0.10.4" + + +[lib] +path = "./src/lib.rs" + +[[bin]] +name="server" +path="./src/server.rs" + +[[bin]] +name="client" +path="./src/client.rs" + + +[build-dependencies] +prost-build = "0.10.4" +dubbo-build = { path = "../../dubbo-build", features = ["generator-code"] } \ No newline at end of file diff --git a/examples/protobuf-transport/build.rs b/examples/protobuf-transport/build.rs new file mode 100644 index 0000000..6459de7 --- /dev/null +++ b/examples/protobuf-transport/build.rs @@ -0,0 +1,9 @@ +use std::io::Result; +use prost_build::Config; + +fn main() -> Result<()> { + Config::new() + .out_dir("src/") + .compile_protos(&["pb/person.proto"], &["pb/"])?; + Ok(()) +} \ No newline at end of file diff --git a/examples/protobuf/pb_message/pb/person.proto b/examples/protobuf-transport/pb/person.proto similarity index 100% rename from examples/protobuf/pb_message/pb/person.proto rename to examples/protobuf-transport/pb/person.proto diff --git a/examples/protobuf-transport/src/client.rs b/examples/protobuf-transport/src/client.rs new file mode 100644 index 0000000..4d598bb --- /dev/null +++ b/examples/protobuf-transport/src/client.rs @@ -0,0 +1,33 @@ +use std::collections::hash_map::HashMap; +use xds::{client::RpcClient}; +use prost::Message; +use hyper::body::Buf; +use hyper::{Request, Response}; + +pub mod person; +use person::Person; + + +#[tokio::main] +async fn main() { + + let mut client = RpcClient::new(String::from("http://127.0.0.1:8972")); + + let service_path = String::from("helloworld"); + let service_method = String::from("hello"); + let metadata = HashMap::new(); + + let mut person = Person::default(); + person.name = "guomiwu".to_string(); + let pbData = person.encode_to_vec(); + + let callResult = client.call(service_path, service_method, &metadata, pbData).await; + let resp = callResult.unwrap(); + + // asynchronously aggregate the chunks of the body + let body = hyper::body::aggregate(resp).await; + let data = body.unwrap().chunk().to_vec(); + + let resPerson = Person::decode(data.as_ref()).unwrap(); + println!("resPerson={:?}", resPerson.name); +} \ No newline at end of file diff --git a/examples/protobuf-transport/src/lib.rs b/examples/protobuf-transport/src/lib.rs new file mode 100644 index 0000000..959e687 --- /dev/null +++ b/examples/protobuf-transport/src/lib.rs @@ -0,0 +1,7 @@ + +pub mod person; +pub use person::*; + +fn main() { + println!("Hello, world!"); +} diff --git a/examples/protobuf/pb_message/src/pb/person.rs b/examples/protobuf-transport/src/person.rs similarity index 100% rename from examples/protobuf/pb_message/src/pb/person.rs rename to examples/protobuf-transport/src/person.rs diff --git a/examples/protobuf/server/src/main.rs b/examples/protobuf-transport/src/server.rs similarity index 82% rename from examples/protobuf/server/src/main.rs rename to examples/protobuf-transport/src/server.rs index a182144..9b0c443 100644 --- a/examples/protobuf/server/src/main.rs +++ b/examples/protobuf-transport/src/server.rs @@ -5,7 +5,7 @@ use xds::{ server::RpcServer }; #[tokio::main] async fn main() { let addr = SocketAddr::from(([127, 0, 0, 1], 8972)); - let mut server = RpcServer::new(addr); + let server = RpcServer::new(addr); server.start().await; println!("RpcServer ok"); } \ No newline at end of file diff --git a/examples/protobuf/client/Cargo.toml b/examples/protobuf/client/Cargo.toml deleted file mode 100644 index e4c426c..0000000 --- a/examples/protobuf/client/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "protobuf" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -xds = { version = "0.1.0", path = "../../../xds" } -tokio = {version = "1.9.0", features = ["full"]} -pb_message = { version = "0.1.0", path = "../pb_message" } -prost = "0.10.4" -hyper = { version = "0.14", features = ["full"] } - diff --git a/examples/protobuf/client/src/main.rs b/examples/protobuf/client/src/main.rs deleted file mode 100644 index bd6507f..0000000 --- a/examples/protobuf/client/src/main.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::collections::hash_map::HashMap; -use xds::{ client::RpcClient }; -use pb_message::pb::person::Person; -use prost::Message; -use hyper::body::Buf; - -#[tokio::main] -async fn main() { - let task = tokio::spawn(async move { - let mut client = RpcClient::new(String::from("http://127.0.0.1:8972")); - - let service_path = String::from("helloworld"); - let service_method = String::from("hello"); - let metadata = HashMap::new(); - - let mut person = Person::default(); - person.name = "guomiwu".to_string(); - let pbData = person.encode_to_vec(); - - let callResult = client.call(service_path, service_method, &metadata, pbData).await; - let resp = callResult.unwrap(); - - // asynchronously aggregate the chunks of the body - let body = hyper::body::aggregate(resp).await; - let data = body.unwrap().chunk().to_vec(); - - let resPerson = Person::decode(data.as_ref()).unwrap(); - println!("resPerson={:?}", resPerson.name); - - }); - task.await.unwrap(); -} \ No newline at end of file diff --git a/examples/protobuf/pb_message/Cargo.toml b/examples/protobuf/pb_message/Cargo.toml deleted file mode 100644 index f4a97cd..0000000 --- a/examples/protobuf/pb_message/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "pb_message" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -prost = "0.10.4" -prost-types = "0.10.1" - -[build-dependencies] -prost-build = "0.10.4" \ No newline at end of file diff --git a/examples/protobuf/pb_message/build.rs b/examples/protobuf/pb_message/build.rs deleted file mode 100644 index a198077..0000000 --- a/examples/protobuf/pb_message/build.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::io::Result; -use prost_build::Config; - -fn main() -> Result<()> { - - Config::new() - .out_dir("src/pb") - .compile_protos(&["pb/person.proto"], &["pb/"])?; - - // println!("cargo:rerun-if-changed=pb/greeter_rpc.pb"); - // println!("cargo:rerun-if-changed=pb/build.rs"); - - Config::new() - .out_dir("src/pb") - .compile_protos(&["pb/greeter.proto"], &["pb/"])?; - - Ok(()) -} \ No newline at end of file diff --git a/examples/protobuf/pb_message/src/lib.rs b/examples/protobuf/pb_message/src/lib.rs deleted file mode 100644 index 3a90aed..0000000 --- a/examples/protobuf/pb_message/src/lib.rs +++ /dev/null @@ -1,37 +0,0 @@ - -pub mod pb; - -#[cfg(test)] -mod tests { - use crate::pb::Person; - use crate::pb::greeter; - - use std::io::Cursor; - use prost::Message; - - pub fn create_hello_request(name: String) -> greeter::HelloRequest { - let mut hello_request = greeter::HelloRequest::default(); - hello_request.name = name; - hello_request - } - - pub fn serialize_greeter(hello: &greeter::HelloRequest) -> Vec<u8> { - let mut buf = Vec::new(); - buf.reserve(hello.encoded_len()); - - hello.encode(&mut buf).unwrap(); - buf - } - - - // pub fn deserialize_greeter(buf: &[u8]) -> Result<greeter::HelloRequest, prost::DecodeError> { - // greeter::HelloRequest::decode(&mut Cursor::new(buf)) - // } - - #[test] - fn it_works() { - let person = Person::default(); - person.encode_to_vec(); - println!("{person:?}"); - } -} diff --git a/examples/protobuf/pb_message/src/pb/greeter.rs b/examples/protobuf/pb_message/src/pb/greeter.rs deleted file mode 100644 index 36aa34d..0000000 --- a/examples/protobuf/pb_message/src/pb/greeter.rs +++ /dev/null @@ -1,10 +0,0 @@ -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HelloRequest { - #[prost(string, tag="1")] - pub name: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HelloResponse { - #[prost(string, tag="1")] - pub message: ::prost::alloc::string::String, -} diff --git a/examples/protobuf/pb_message/src/pb/mod.rs b/examples/protobuf/pb_message/src/pb/mod.rs deleted file mode 100644 index 8c11e43..0000000 --- a/examples/protobuf/pb_message/src/pb/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod person; -pub mod greeter; - -pub use person::*; -pub use greeter::*; \ No newline at end of file diff --git a/examples/protobuf/server/Cargo.toml b/examples/protobuf/server/Cargo.toml deleted file mode 100644 index fe077e4..0000000 --- a/examples/protobuf/server/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "server" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -xds = { version = "0.1.0", path = "../../../xds" } -tokio = {version = "1.9.0", features = ["full"]} \ No newline at end of file diff --git a/xds/Cargo.toml b/xds/Cargo.toml index 49af168..f224f7c 100644 --- a/xds/Cargo.toml +++ b/xds/Cargo.toml @@ -25,3 +25,6 @@ flate2 = "1.0" tower = "0.4" log = "0.4" env_logger = "0.9.0" + + +prost = "0.10.4" \ No newline at end of file diff --git a/xds/src/error.rs b/xds/src/error.rs new file mode 100644 index 0000000..e212835 --- /dev/null +++ b/xds/src/error.rs @@ -0,0 +1,136 @@ +use serde_json; +use prost::{DecodeError, EncodeError}; +use hyper::{Body, Response, Method, Version, header, StatusCode}; +use crate::response::ServiceResponse; + + +/// A JSON-serializable DBError error +#[derive(Debug)] +pub struct DBError { + pub status: StatusCode, + pub error_type: String, + pub msg: String, + pub meta: Option<serde_json::Value>, +} + +impl DBError { + /// Create a DBError error with no meta + pub fn new(status: StatusCode, error_type: &str, msg: &str) -> DBError { + DBError::new_meta(status, error_type, msg, None) + } + + /// Create a DBError error with optional meta + pub fn new_meta(status: StatusCode, error_type: &str, msg: &str, meta: Option<serde_json::Value>) -> DBError { + DBError { status, error_type: error_type.to_string(), msg: msg.to_string(), meta } + } + + /// Create a byte-array service response for this error and the given status code + pub fn to_resp_raw(&self) -> ServiceResponse<Vec<u8>> { + let output = self.to_json_bytes().unwrap_or_else(|_| "{}".as_bytes().to_vec()); + let mut headers = header::HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, "json".parse().unwrap()); + let lenth_value = header::HeaderValue::from(output.len() as u64); + headers.insert(header::CONTENT_LENGTH, lenth_value); + + ServiceResponse { + version: Version::default(), + headers, + status: self.status, + output, + } + } + + /// Create a hyper response for this error and the given status code + pub fn to_hyper_resp(&self) -> Response<Body> { + let body = self.to_json_bytes().unwrap_or_else(|_| "{}".as_bytes().to_vec()); + + let mut resp = Response::builder() + .status(self.status) + .body(Body::from(body.clone())).unwrap(); + + let header_mut = resp.headers_mut(); + header_mut.insert(header::CONTENT_TYPE, "json".parse().unwrap()); + let lenth_value = header::HeaderValue::from(body.len() as u64); + header_mut.insert(header::CONTENT_LENGTH, lenth_value); + resp + } + + /// Create error from Serde JSON value + pub fn from_json(status: StatusCode, json: serde_json::Value) -> DBError { + let error_type = json["error_type"].as_str(); + DBError { + status, + error_type: error_type.unwrap_or("<no code>").to_string(), + msg: json["msg"].as_str().unwrap_or("<no message>").to_string(), + // Put the whole thing as meta if there was no type + meta: if error_type.is_some() { json.get("meta").map(|v| v.clone()) } else { Some(json.clone()) }, + } + } + + /// Create error from byte array + pub fn from_json_bytes(status: StatusCode, json: &[u8]) -> serde_json::Result<DBError> { + serde_json::from_slice(json).map(|v| DBError::from_json(status, v)) + } + + /// Create Serde JSON value from error + pub fn to_json(&self) -> serde_json::Value { + let mut props = serde_json::map::Map::new(); + props.insert("error_type".to_string(), serde_json::Value::String(self.error_type.clone())); + props.insert("msg".to_string(), serde_json::Value::String(self.msg.clone())); + if let Some(ref meta) = self.meta { props.insert("meta".to_string(), meta.clone()); } + serde_json::Value::Object(props) + } + + /// Create byte array from error + pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> { + serde_json::to_vec(&self.to_json()) + } +} + +impl From<DBError> for DBProstError { + fn from(v: DBError) -> DBProstError { DBProstError::DBWrapError(v) } +} + + + + + +/// An error that can occur during a call to a service +#[derive(Debug)] +pub enum DBProstError { + /// A standard DBError error with a type, message, and some metadata + DBWrapError(DBError), + /// An error when trying to decode JSON into an error or object + JsonDecodeError(serde_json::Error), + /// An error when trying to encode a protobuf object + ProstEncodeError(EncodeError), + /// An error when trying to decode a protobuf object + ProstDecodeError(DecodeError), + /// A generic hyper error + HyperError(hyper::Error), + /// A wrapper for any of the other `DBProstError`s that also includes request/response info + AfterBodyError { + /// The request or response's raw body before the error happened + body: Vec<u8>, + /// The request method, only present for server errors + method: Option<Method>, + /// The request or response's HTTP version + version: Version, + /// The request or response's headers + headers: header::HeaderMap, + /// The response status, only present for client errors + status: Option<StatusCode>, + /// The underlying error + err: Box<DBProstError>, + }, +} + +impl DBProstError { + /// This same error, or the underlying error if it is an `AfterBodyError` + pub fn root_err(self) -> DBProstError { + match self { + DBProstError::AfterBodyError { err, .. } => err.root_err(), + _ => self + } + } +} \ No newline at end of file diff --git a/xds/src/lib.rs b/xds/src/lib.rs index 4f818f9..d3a7e94 100644 --- a/xds/src/lib.rs +++ b/xds/src/lib.rs @@ -19,6 +19,23 @@ pub mod client; pub mod server; pub mod protocol; + +pub mod request; +pub mod response; +pub mod wrapper; +pub mod error; +pub mod util; + + + +pub use wrapper::*; +pub use request::*; +pub use response::*; +pub use error::*; +pub use util::*; + + + #[cfg(test)] mod tests { use crate::client::client::RpcClient; diff --git a/xds/src/request.rs b/xds/src/request.rs new file mode 100644 index 0000000..4d497c0 --- /dev/null +++ b/xds/src/request.rs @@ -0,0 +1,131 @@ +use hyper::{body, Body, Request, Uri, Method, Version, header}; +use prost::{Message}; +use crate::util::*; +use crate::error::*; + +/// A request with HTTP info and the serialized input object +#[derive(Debug)] +pub struct ServiceRequest<T> { + /// The URI of the original request + /// + /// When using a client, this will be overridden with the proper URI. It is only valuable for servers. + pub uri: Uri, + /// The request method; should always be Post + pub method: Method, + /// The HTTP version, rarely changed from the default + pub version: Version, + /// The set of headers + /// + /// Should always at least have `Content-Type`. Clients will override `Content-Length` on serialization. + pub headers: header::HeaderMap, + // The serialized request object + pub input: T, +} + +impl<T> ServiceRequest<T> { + /// Create new service request with the given input object + /// + /// This automatically sets the `Content-Type` header as `application/protobuf`. + pub fn new(input: T) -> ServiceRequest<T> { + let mut header = header::HeaderMap::new(); + header.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap()); + ServiceRequest { + uri: Default::default(), + method: Method::POST, + version: Version::default(), + headers: header, + input, + } + } + + /// Copy this request with a different input value + pub fn clone_with_input<U>(&self, input: U) -> ServiceRequest<U> { + ServiceRequest { + uri: self.uri.clone(), + method: self.method.clone(), + version: self.version, + headers: self.headers.clone(), + input, + } + } +} + +impl<T: Message + Default + 'static> From<T> for ServiceRequest<T> { + fn from(v: T) -> ServiceRequest<T> { ServiceRequest::new(v) } +} + +impl ServiceRequest<Vec<u8>> { + /// Turn a hyper request to a boxed future of a byte-array service request + pub fn from_hyper_raw(req: Request<Body>) -> BoxFutureReq<Vec<u8>> { + Box::pin(async move { + let uri = req.uri().clone(); + let method = req.method().clone(); + let version = req.version(); + let headers = req.headers().clone(); + let future_req = body::to_bytes(req.into_body()).await + .map_err(DBProstError::HyperError) + .map(move |body| + ServiceRequest { uri, method, version, headers, input: body.to_vec() } + ); + future_req + }) + } + + /// Turn a byte-array service request into a hyper request + pub fn to_hyper_raw(&self) -> Request<Body> { + let mut request = Request::builder() + .method(self.method.clone()) + .uri(self.uri.clone()) + .body(Body::from(self.input.clone())).unwrap(); + + let header_mut = request.headers_mut(); + header_mut.clone_from(&self.headers); + let lenth_value = header::HeaderValue::from(self.input.len() as u64); + header_mut.insert(header::CONTENT_LENGTH, lenth_value); + request + } + + /// Turn a byte-array service request into a `AfterBodyError`-wrapped version of the given error + pub fn body_err(&self, err: DBProstError) -> DBProstError { + DBProstError::AfterBodyError { + body: self.input.clone(), + method: Some(self.method.clone()), + version: self.version, + headers: self.headers.clone(), + status: None, + err: Box::new(err), + } + } + + /// Serialize the byte-array service request into a protobuf service request + pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> { + match T::decode(self.input.as_ref()) { + Ok(v) => Ok(self.clone_with_input(v)), + Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err))) + } + } +} + +impl<T: Message + Default + 'static> ServiceRequest<T> { + /// Turn a protobuf service request into a byte-array service request + pub fn to_proto_raw(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> { + let mut body = Vec::new(); + if let Err(err) = self.input.encode(&mut body) { + Err(DBProstError::ProstEncodeError(err)) + } else { + Ok(self.clone_with_input(body)) + } + } + + /// Turn a hyper request into a protobuf service request + pub fn from_hyper_proto(req: Request<Body>) -> BoxFutureReq<T> { + Box::pin(async move { + ServiceRequest::from_hyper_raw(req).await.and_then(|v| v.to_proto()) + }) + } + + /// Turn a protobuf service request into a hyper request + pub fn to_hyper_proto(&self) -> Result<Request<Body>, DBProstError> { + self.to_proto_raw().map(|v| v.to_hyper_raw()) + } +} \ No newline at end of file diff --git a/xds/src/response.rs b/xds/src/response.rs new file mode 100644 index 0000000..b38db8b --- /dev/null +++ b/xds/src/response.rs @@ -0,0 +1,131 @@ +use hyper::{body, Body, Response, Version, header, StatusCode}; +use prost::{Message}; +use crate::util::*; +use crate::error::*; + + +/// A response with HTTP info and a serialized output object +#[derive(Debug)] +pub struct ServiceResponse<T> { + /// The HTTP version + pub version: Version, + /// The set of headers + /// + /// Should always at least have `Content-Type`. Servers will override `Content-Length` on serialization. + pub headers: header::HeaderMap, + /// The status code + pub status: StatusCode, + /// The serialized output object + pub output: T, +} + +impl<T> ServiceResponse<T> { + /// Create new service request with the given input object + /// + /// This automatically sets the `Content-Type` header as `application/protobuf`. + pub fn new(output: T) -> ServiceResponse<T> { + let mut headers = header::HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap()); + + ServiceResponse { + version: Version::default(), + headers, + status: StatusCode::OK, + output, + } + } + + /// Copy this response with a different output value + pub fn clone_with_output<U>(&self, output: U) -> ServiceResponse<U> { + ServiceResponse { version: self.version, headers: self.headers.clone(), status: self.status, output } + } +} + +impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> { + fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) } +} + +impl ServiceResponse<Vec<u8>> { + /// Turn a hyper response to a boxed future of a byte-array service response + pub fn from_hyper_raw(resp: Response<Body>) -> BoxFutureResp<Vec<u8>> { + Box::pin(async move { + let version = resp.version(); + let headers = resp.headers().clone(); + let status = resp.status().clone(); + let future_resp = body::to_bytes(resp.into_body()).await + .map_err(DBProstError::HyperError) + .map(move |body| + ServiceResponse { version, headers, status, output: body.to_vec() } + ); + future_resp + }) + } + + /// Turn a byte-array service response into a hyper response + pub fn to_hyper_raw(&self) -> Response<Body> { + let mut resp = Response::builder() + .status(StatusCode::OK) + .body(Body::from(self.output.clone())).unwrap(); + + let header_mut = resp.headers_mut(); + header_mut.clone_from(&self.headers); + let lenth_value = header::HeaderValue::from(self.output.len() as u64); + header_mut.insert(header::CONTENT_LENGTH, lenth_value); + resp + } + + /// Turn a byte-array service response into a `AfterBodyError`-wrapped version of the given error + pub fn body_err(&self, err: DBProstError) -> DBProstError { + DBProstError::AfterBodyError { + body: self.output.clone(), + method: None, + version: self.version, + headers: self.headers.clone(), + status: Some(self.status), + err: Box::new(err), + } + } + + + /// Serialize the byte-array service response into a protobuf service response + pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> { + if self.status.is_success() { + match T::decode(self.output.as_ref()) { + Ok(v) => Ok(self.clone_with_output(v)), + Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err))) + } + } else { + match DBError::from_json_bytes(self.status, &self.output) { + Ok(err) => Err(self.body_err(DBProstError::DBWrapError(err))), + Err(err) => Err(self.body_err(DBProstError::JsonDecodeError(err))) + } + } + } +} + +impl<T: Message + Default + 'static> ServiceResponse<T> { + /// Turn a protobuf service response into a byte-array service response + pub fn to_proto_raw(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> { + let mut body = Vec::new(); + if let Err(err) = self.output.encode(&mut body) { + Err(DBProstError::ProstEncodeError(err)) + } else { + Ok(self.clone_with_output(body)) + } + } + + + /// Turn a hyper response into a protobuf service response + pub async fn from_hyper_proto(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> { + // Box::pin(async move { + ServiceResponse::from_hyper_raw(resp).await.and_then(|v| v.to_proto()) + // }) + } + + + /// Turn a protobuf service response into a hyper response + pub fn to_hyper_proto(&self) -> Result<Response<Body>, DBProstError> { + self.to_proto_raw().map(|v| v.to_hyper_raw()) + } +} + diff --git a/xds/src/util.rs b/xds/src/util.rs new file mode 100644 index 0000000..2fbe3f3 --- /dev/null +++ b/xds/src/util.rs @@ -0,0 +1,9 @@ +use futures::future::BoxFuture; +use crate::request::ServiceRequest; +use crate::response::ServiceResponse; +use crate::error::DBProstError; + + +pub type BoxFutureReq<T> = BoxFuture<'static, Result<ServiceRequest<T>, DBProstError>>; +pub type BoxFutureResp<O> = BoxFuture<'static, Result<ServiceResponse<O>, DBProstError>>; + diff --git a/xds/src/wrapper.rs b/xds/src/wrapper.rs new file mode 100644 index 0000000..b80ffa8 --- /dev/null +++ b/xds/src/wrapper.rs @@ -0,0 +1,110 @@ +use std::sync::Arc; +use std::task::Poll; +use futures::future::BoxFuture; + +use hyper::{Body, Request, Response, header, StatusCode}; + +use hyper::client::conn::Builder; +use hyper::client::connect::HttpConnector; +use hyper::client::service::Connect; + +use tower::Service; +use prost::{Message}; + +use crate::request::ServiceRequest; +use crate::response::ServiceResponse; +use crate::util::*; +use crate::error::*; + + + +/// A wrapper for a hyper client +#[derive(Debug)] +pub struct HyperClient { + /// The hyper client + /// The root URL without any path attached + pub root_url: String, +} + +impl HyperClient { + /// Create a new client wrapper for the given client and root using protobuf + pub fn new(root_url: &str) -> HyperClient { + HyperClient { + root_url: root_url.trim_end_matches('/').to_string(), + } + } + + // Invoke the given request for the given path and return a boxed future result + pub async fn request<I, O>(&self, path: &str, req: ServiceRequest<I>) -> Result<ServiceResponse<O>, DBProstError> + where I: Message + Default + 'static, O: Message + Default + 'static { + let hyper_req = req.to_hyper_proto().unwrap(); + let mut hyper_connect = Connect::new(HttpConnector::new(), Builder::new()); + + let url_string = format!("{}/{}", self.root_url, path.trim_start_matches('/')); + let uri = url_string.parse::<hyper::Uri>().unwrap(); + + let mut req_to_send = hyper_connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap(); + + let hyper_resp = req_to_send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap(); + ServiceResponse::from_hyper_proto(hyper_resp).await + } +} + +/// Service for taking a raw service request and returning a boxed future of a raw service response +pub trait HyperService { + /// Accept a raw service request and return a boxed future of a raw service response + fn handle(&self, req: ServiceRequest<Vec<u8>>) -> BoxFutureResp<Vec<u8>>; +} + +/// A wrapper for a `HyperService` trait that keeps a `Arc` version of the service +pub struct HyperServer<T> { + /// The `Arc` version of the service + /// + /// Needed because of [hyper Service lifetimes](https://github.com/tokio-rs/tokio-service/issues/9) + pub service: Arc<T>, +} + +impl<T> HyperServer<T> { + /// Create a new service wrapper for the given impl + pub fn new(service: T) -> HyperServer<T> { HyperServer { service: Arc::new(service) } } +} + +impl<T> Service<Request<Body>> for HyperServer<T> + where T: 'static + Send + Sync + HyperService +{ + type Response = Response<Body>; + type Error = hyper::Error; + type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + let content_type = req.headers().get(header::CONTENT_TYPE).unwrap().to_str().unwrap(); + if content_type == "application/protobuf" { + Box::pin(async move { + let status = StatusCode::UNSUPPORTED_MEDIA_TYPE; + Ok(DBError::new(status, "bad_content_type", "Content type must be application/protobuf").to_hyper_resp()) + }) + } else { + let service = self.service.clone(); + Box::pin(async move { + let request = ServiceRequest::from_hyper_raw(req).await; + let resp = service.handle(request.unwrap()).await; + resp.map(|v| v.to_hyper_raw()) + .or_else(|err| match err.root_err() { + DBProstError::ProstDecodeError(_) => + Ok(DBError::new(StatusCode::BAD_REQUEST, "protobuf_decode_err", "Invalid protobuf body"). + to_hyper_resp()), + DBProstError::DBWrapError(err) => + Ok(err.to_hyper_resp()), + DBProstError::HyperError(err) => + Err(err), + _ => Ok(DBError::new(StatusCode::INTERNAL_SERVER_ERROR, "internal_err", "Internal Error"). + to_hyper_resp()), + }) + }) + } + } +} \ No newline at end of file
