This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch poc-idl in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 60bfedf3134569b97dd7648cc747ebb05476250c Author: hxq <[email protected]> AuthorDate: Thu Jul 21 16:25:11 2022 +0800 finished 1.0.0 --- .github/.DS_Store | Bin 0 -> 6148 bytes dubbo-build/.DS_Store | Bin 0 -> 6148 bytes dubbo-build/src/generator.rs | 106 +++++++++++++++++------- dubbo-build/src/lib.rs | 17 +++- examples/.DS_Store | Bin 0 -> 6148 bytes examples/grpc-gen/.DS_Store | Bin 0 -> 6148 bytes examples/grpc-gen/Cargo.toml | 14 +++- examples/grpc-gen/build.rs | 3 +- examples/grpc-gen/src/.DS_Store | Bin 0 -> 6148 bytes examples/grpc-gen/src/client.rs | 10 +++ examples/grpc-gen/src/greeter.rs | 68 +++++++++++----- examples/grpc-gen/src/lib.rs | 6 ++ examples/grpc-gen/src/main.rs | 12 --- examples/grpc-gen/src/server.rs | 38 +++++++++ examples/protobuf-transport/src/lib.rs | 2 +- xds/.DS_Store | Bin 0 -> 6148 bytes xds/src/.DS_Store | Bin 0 -> 6148 bytes xds/src/client/client.rs | 34 +++++++- xds/src/error.rs | 17 ++++ xds/src/lib.rs | 3 - xds/src/protocol/error.rs | 17 ++++ xds/src/protocol/message.rs | 17 ++++ xds/src/protocol/mod.rs | 17 ++++ xds/src/request.rs | 143 ++++++++++++++++----------------- xds/src/response.rs | 116 +++++++++++++------------- xds/src/server/.DS_Store | Bin 0 -> 6148 bytes xds/src/server/server.rs | 17 ++-- xds/src/util.rs | 17 ++++ xds/src/wrapper.rs | 110 ------------------------- 29 files changed, 467 insertions(+), 317 deletions(-) diff --git a/.github/.DS_Store b/.github/.DS_Store new file mode 100644 index 0000000..3bb81dc Binary files /dev/null and b/.github/.DS_Store differ diff --git a/dubbo-build/.DS_Store b/dubbo-build/.DS_Store new file mode 100644 index 0000000..cd16f51 Binary files /dev/null and b/dubbo-build/.DS_Store differ diff --git a/dubbo-build/src/generator.rs b/dubbo-build/src/generator.rs index 05e25cf..79fa172 100644 --- a/dubbo-build/src/generator.rs +++ b/dubbo-build/src/generator.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use prost_build::{Method, Service, ServiceGenerator}; #[derive(Default)] @@ -8,17 +25,32 @@ impl CodeGenerator { fn generate_module_name(&self) -> &str { "xds" } - fn generate_type_aliases(&mut self, buf: &mut String) { + fn generate_uses(&mut self, buf: &mut String) { buf.push_str(&format!( "\n\ - pub type DBReq<I> = {0}::request::ServiceRequest<I>;\n\ - pub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n", + use async_trait::async_trait;\n\ + use tower::Service;\n\ + use hyper::{{\n \ + Body,\n \ + Request,\n \ + Response,\n\ + }};\n\ + use futures::future::{{\n \ + BoxFuture\n\ + }};\n\ + use std::{{\n \ + task::{{Poll}},\n\ + }};\n")); + } + + fn generate_type_aliases(&mut self, buf: &mut String) { + buf.push_str(&format!( + "\npub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n", self.generate_module_name())); } fn generate_main_trait(&self, service: &Service, buf: &mut String) { - buf.push_str("\nuse async_trait::async_trait;\n\n#[async_trait]\n"); - + buf.push_str("\n\n#[async_trait]\n"); service.comments.append_with_indent(0, buf); buf.push_str(&format!("pub trait {} {{", service.name)); for method in service.methods.iter() { @@ -30,7 +62,7 @@ impl CodeGenerator { } fn method_sig(&self, method: &Method) -> String { - format!("async fn {0}(&self, request: DBReq<{1}>) -> DBResp<{2}>", + format!("async fn {0}(&self, request: {1}) -> DBResp<{2}>", method.name, method.input_type, method.output_type) } @@ -38,7 +70,7 @@ impl CodeGenerator { buf.push_str(&format!( "\n\ pub struct {0}Client {{\n \ - pub hyper_client: {1}::wrapper::HyperClient \n\ + pub rpc_client: {1}::client::RpcClient \n\ }}\n",service.name, self.generate_module_name())); } @@ -46,9 +78,9 @@ impl CodeGenerator { buf.push_str(&format!( "\n\ impl {0}Client {{\n \ - pub fn new(root_url: &str) -> {0}Client {{\n \ + pub fn new(addr: String) -> {0}Client {{\n \ {0}Client {{\n \ - hyper_client: {1}::wrapper::HyperClient::new(root_url) \n \ + rpc_client: {1}::client::RpcClient::new(addr) \n \ }}\n \ }}\n\ }}\n", service.name, self.generate_module_name())); @@ -58,8 +90,9 @@ impl CodeGenerator { for method in service.methods.iter() { buf.push_str(&format!( - "\n {} {{\n \ - self.hyper_client.request(\"/dubbo/{}.{}/{}\", request).await\n \ + "\n {0} {{\n \ + let path = \"dubbo/{1}.{2}/{3}\".to_owned();\n \ + self.rpc_client.request(request, path).await\n \ }}\n", self.method_sig(method), service.package, service.proto_name, method.proto_name)); } buf.push_str("}\n"); @@ -69,50 +102,64 @@ impl CodeGenerator { buf.push_str(&format!( "\n\ pub struct {0}Server<T: 'static + {0} + Send + Sync + Clone> {{\n \ - pub hyper_server: {1}::wrapper::HyperServer<T> \n\ - }}\n",service.name, self.generate_module_name())); + pub inner: T\n\ + }}\n",service.name)); } fn generate_server_impl(&self, service: &Service, buf: &mut String) { buf.push_str(&format!( "\n\ impl<T: 'static + {0} + Send + Sync + Clone> {0}Server<T> {{\n \ - pub fn new(&self, service: T) -> {0}Server<T> {{\n \ + pub fn new(service: T) -> {0}Server<T> {{\n \ {0}Server {{\n \ - hyper_server: {1}::wrapper::HyperServer::new(service) \n \ + inner: service \n \ }}\n \ }}\n\ }}\n", - service.name, self.generate_module_name())); + service.name)); buf.push_str(&format!( "\n\ - impl<T: 'static + {0} + Send + Sync + Clone> {1}::wrapper::HyperService for {0}Server<T> {{\n \ - fn handle(&self, req: DBReq<Vec<u8>>) -> {1}::BoxFutureResp<Vec<u8>> {{\n \ - use ::futures::Future;\n \ - let trait_object_service = self.hyper_server.service.clone();\n \ - match (req.method.clone(), req.uri.path()) {{", - service.name, self.generate_module_name())); + impl<T> Service<Request<Body>> for {0}Server<T> \n \ + where T: 'static + Greeter + Send + Sync + Clone {{\n \ + type Response = Response<Body>;\n \ + type Error = hyper::Error;\n \ + type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;\n \ + \n \ + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {{\n \ + Poll::Ready(Ok(()))\n \ + }}\n \ + + \n \ + + fn call(&mut self, req: Request<Body>) -> Self::Future {{\n \ + let inner = self.inner.clone();\n \ + let url = req.uri().path();\n \ + match (req.method().clone(), url) {{", + service.name)); // Make match arms for each type for method in service.methods.iter() { buf.push_str(&format!( "\n \ (::hyper::Method::POST, \"/dubbo/{0}.{1}/{2}\") => {{\n \ - Box::pin(async move {{ \n \ - let proto_req = req.to_proto().unwrap(); \n \ - let resp = trait_object_service.{3}(proto_req).await.unwrap(); \n \ - let proto_resp = resp.to_proto_raw(); \n \ - proto_resp \n \ + Box::pin(async move {{\n \ + let request = {3}::ServiceRequest::try_from_hyper(req).await;\n \ + let proto_req = request.unwrap().try_decode().unwrap();\n \ + let resp = inner.{4}(proto_req.input).await.unwrap();\n \ + let proto_resp = resp.try_encode();\n \ + let hyper_resp = proto_resp.unwrap().into_hyper();\n \ + Ok(hyper_resp) \n \ }}) \n \ - }},", service.package, service.proto_name, method.proto_name, method.name)); + }},", service.package, service.proto_name, method.proto_name, self.generate_module_name(), method.name)); } // Final 404 arm and end fn buf.push_str(&format!( "\n \ _ => {{\n \ Box::pin(async move {{ \n \ - Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_resp_raw()) \n \ + Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_hyper_resp()) \n \ }}) \n \ }}\n \ }}\n \ @@ -123,6 +170,7 @@ impl CodeGenerator { impl ServiceGenerator for CodeGenerator { fn generate(&mut self, service: Service, buf: &mut String) { + self.generate_uses(buf); self.generate_type_aliases(buf); self.generate_main_trait(&service, buf); self.generate_client_struct(&service, buf); diff --git a/dubbo-build/src/lib.rs b/dubbo-build/src/lib.rs index 329eb39..225d4f6 100644 --- a/dubbo-build/src/lib.rs +++ b/dubbo-build/src/lib.rs @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #[cfg(feature = "generator-code")] extern crate prost_build; diff --git a/examples/.DS_Store b/examples/.DS_Store new file mode 100644 index 0000000..0d3b142 Binary files /dev/null and b/examples/.DS_Store differ diff --git a/examples/grpc-gen/.DS_Store b/examples/grpc-gen/.DS_Store new file mode 100644 index 0000000..b57983e Binary files /dev/null and b/examples/grpc-gen/.DS_Store differ diff --git a/examples/grpc-gen/Cargo.toml b/examples/grpc-gen/Cargo.toml index 335c81c..7122d68 100644 --- a/examples/grpc-gen/Cargo.toml +++ b/examples/grpc-gen/Cargo.toml @@ -6,13 +6,25 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tokio = {version = "1.9.0", features = ["full"]} futures = "0.3.21" prost = "0.10.4" async-trait = { version = "0.1.56" } xds = { version = "0.1.0", path = "../../xds" } - hyper = { version = "0.14.19", features = ["full"] } +pin-project = "1.0.11" +tower = "0.4" + +[lib] +path = "./src/lib.rs" + +[[bin]] +name="server" +path="./src/server.rs" +[[bin]] +name="client" +path="./src/client.rs" [build-dependencies] diff --git a/examples/grpc-gen/build.rs b/examples/grpc-gen/build.rs index 0098c18..7a3e44f 100644 --- a/examples/grpc-gen/build.rs +++ b/examples/grpc-gen/build.rs @@ -3,10 +3,9 @@ use prost_build::Config; fn main() -> Result<()> { let mut conf = Config::new(); - let mut gen = dubbo_build::CodeGenerator::new(); + let gen = dubbo_build::CodeGenerator::new(); conf.service_generator(Box::new(gen)); conf.out_dir("src/"); conf.compile_protos(&["pb/greeter.proto"], &["pb/"]).unwrap(); - Ok(()) } \ No newline at end of file diff --git a/examples/grpc-gen/src/.DS_Store b/examples/grpc-gen/src/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/examples/grpc-gen/src/.DS_Store differ diff --git a/examples/grpc-gen/src/client.rs b/examples/grpc-gen/src/client.rs new file mode 100644 index 0000000..0f846a8 --- /dev/null +++ b/examples/grpc-gen/src/client.rs @@ -0,0 +1,10 @@ + +mod greeter; +use greeter::*; +#[tokio::main] +async fn main() { + let client = GreeterClient::new("http://127.0.0.1:8972".to_owned()); + let resp = client.say_hello(HelloRequest { name: "johankoi".into()}).await.unwrap(); + let hello_resp = resp.output; + println!("{}",hello_resp.message); +} \ No newline at end of file diff --git a/examples/grpc-gen/src/greeter.rs b/examples/grpc-gen/src/greeter.rs index 3ecb8f2..9895a76 100644 --- a/examples/grpc-gen/src/greeter.rs +++ b/examples/grpc-gen/src/greeter.rs @@ -9,63 +9,87 @@ pub struct HelloResponse { pub message: ::prost::alloc::string::String, } -pub type DBReq<I> = xds::request::ServiceRequest<I>; +use async_trait::async_trait; +use tower::Service; +use hyper::{ + Body, + Request, + Response, +}; +use futures::future::{ + BoxFuture +}; +use std::{ + task::{Poll}, +}; + pub type DBResp<O> = Result<xds::response::ServiceResponse<O>, xds::error::DBProstError>; -use async_trait::async_trait; #[async_trait] pub trait Greeter { - async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse>; + async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse>; } pub struct GreeterClient { - pub hyper_client: xds::wrapper::HyperClient + pub rpc_client: xds::client::RpcClient } impl GreeterClient { - pub fn new(root_url: &str) -> GreeterClient { + pub fn new(addr: String) -> GreeterClient { GreeterClient { - hyper_client: xds::wrapper::HyperClient::new(root_url) + rpc_client: xds::client::RpcClient::new(addr) } } } #[async_trait] impl Greeter for GreeterClient { - async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse> { - self.hyper_client.request("/dubbo/greeter.Greeter/SayHello", request).await + async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse> { + let path = "dubbo/greeter.Greeter/SayHello".to_owned(); + self.rpc_client.request(request, path).await } } pub struct GreeterServer<T: 'static + Greeter + Send + Sync + Clone> { - pub hyper_server: xds::wrapper::HyperServer<T> + pub inner: T } impl<T: 'static + Greeter + Send + Sync + Clone> GreeterServer<T> { - pub fn new(&self, service: T) -> GreeterServer<T> { + pub fn new(service: T) -> GreeterServer<T> { GreeterServer { - hyper_server: xds::wrapper::HyperServer::new(service) + inner: service } } } -impl<T: 'static + Greeter + Send + Sync + Clone> xds::wrapper::HyperService for GreeterServer<T> { - fn handle(&self, req: DBReq<Vec<u8>>) -> xds::BoxFutureResp<Vec<u8>> { - use ::futures::Future; - let trait_object_service = self.hyper_server.service.clone(); - match (req.method.clone(), req.uri.path()) { +impl<T> Service<Request<Body>> for GreeterServer<T> + where T: 'static + Greeter + Send + Sync + Clone { + type Response = Response<Body>; + type Error = hyper::Error; + type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + let inner = self.inner.clone(); + let url = req.uri().path(); + match (req.method().clone(), url) { (::hyper::Method::POST, "/dubbo/greeter.Greeter/SayHello") => { - Box::pin(async move { - let proto_req = req.to_proto().unwrap(); - let resp = trait_object_service.say_hello(proto_req).await.unwrap(); - let proto_resp = resp.to_proto_raw(); - proto_resp + Box::pin(async move { + let request = xds::ServiceRequest::try_from_hyper(req).await; + let proto_req = request.unwrap().try_decode().unwrap(); + let resp = inner.say_hello(proto_req.input).await.unwrap(); + let proto_resp = resp.try_encode(); + let hyper_resp = proto_resp.unwrap().into_hyper(); + Ok(hyper_resp) }) }, _ => { Box::pin(async move { - Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_resp_raw()) + Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_hyper_resp()) }) } } diff --git a/examples/grpc-gen/src/lib.rs b/examples/grpc-gen/src/lib.rs new file mode 100644 index 0000000..c032559 --- /dev/null +++ b/examples/grpc-gen/src/lib.rs @@ -0,0 +1,6 @@ + + +fn main() { + + +} diff --git a/examples/grpc-gen/src/main.rs b/examples/grpc-gen/src/main.rs deleted file mode 100644 index 68b341a..0000000 --- a/examples/grpc-gen/src/main.rs +++ /dev/null @@ -1,12 +0,0 @@ - -mod greeter; -use greeter::*; - - -fn main() { - - let client = GreeterClient::new("0.0.0.0:8080"); - let hello_req = DBReq::new(HelloRequest { name: "johankoi".into()}); - client.say_hello(hello_req); - -} diff --git a/examples/grpc-gen/src/server.rs b/examples/grpc-gen/src/server.rs new file mode 100644 index 0000000..3c15554 --- /dev/null +++ b/examples/grpc-gen/src/server.rs @@ -0,0 +1,38 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use async_trait::async_trait; +use hyper::{Server as hyper_server}; +use hyper::service::{make_service_fn}; + +mod greeter; +use greeter::*; +use xds::ServiceResponse; + + +#[derive(Default, Clone)] +pub struct HelloService {} + +#[async_trait] +impl Greeter for HelloService { + async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse> { + println!("{}",request.name); + Ok(ServiceResponse::new(HelloResponse { message: "hello, dubbo rust!".into() })) + } +} + + +#[tokio::main] +async fn main() { + let addr = SocketAddr::from(([127, 0, 0, 1], 8972)); + let make_service = make_service_fn(|_conn| async { + let server = GreeterServer::new(HelloService::default()); + Ok::<_, Infallible>(server) + }); + + let server = hyper_server::bind(&addr).serve(make_service); + println!("Listening on http://{}", &addr); + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } +} + diff --git a/examples/protobuf-transport/src/lib.rs b/examples/protobuf-transport/src/lib.rs index 959e687..5e94390 100644 --- a/examples/protobuf-transport/src/lib.rs +++ b/examples/protobuf-transport/src/lib.rs @@ -3,5 +3,5 @@ pub mod person; pub use person::*; fn main() { - println!("Hello, world!"); + } diff --git a/xds/.DS_Store b/xds/.DS_Store new file mode 100644 index 0000000..b8d457d Binary files /dev/null and b/xds/.DS_Store differ diff --git a/xds/src/.DS_Store b/xds/src/.DS_Store new file mode 100644 index 0000000..05fc01e Binary files /dev/null and b/xds/src/.DS_Store differ diff --git a/xds/src/client/client.rs b/xds/src/client/client.rs index 1387e36..3c0b655 100644 --- a/xds/src/client/client.rs +++ b/xds/src/client/client.rs @@ -21,8 +21,21 @@ use hyper::client::service::Connect; use hyper::service::Service; use hyper::{Body, Request, Response}; -use crate::protocol::message::*; +use crate::protocol::message::{ + Message as DubboMessage, + RpcxMessage, + Metadata, + MessageType, + CompressType, + SerializeType +}; + use std::collections::HashMap; +use prost::{Message}; + +use crate::request::ServiceRequest; +use crate::response::ServiceResponse; +use crate::error::*; pub struct RpcClient { @@ -36,6 +49,23 @@ impl RpcClient { } } + /// Invoke the given request for the given path and return a result of Result<ServiceResponse<O>, DBProstError> + pub async fn request<I, O>(&self, message: I, path: String) -> Result<ServiceResponse<O>, DBProstError> + where I: Message + Default + 'static, + O: Message + Default + 'static { + + let url_str = format!("{}/{}", self.addr.as_str(), path.as_str()); + let uri = url_str.parse::<hyper::Uri>().unwrap(); + + let req = ServiceRequest::new(message, uri.clone()); + let hyper_req = req.into_encoded_hyper().unwrap(); + + let mut connect = Connect::new(HttpConnector::new(), Builder::new()); + let mut send = connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap(); + let hyper_resp = send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap(); + ServiceResponse::decode_response(hyper_resp).await + } + pub async fn call( &mut self, service_path: String, @@ -44,7 +74,7 @@ impl RpcClient { payload: Vec<u8> ) -> std::result::Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> { - let mut req = Message::new(); + let mut req = DubboMessage::new(); req.set_version(0); req.set_message_type(MessageType::Request); req.set_serialize_type(SerializeType::Protobuf); diff --git a/xds/src/error.rs b/xds/src/error.rs index e212835..2748a2e 100644 --- a/xds/src/error.rs +++ b/xds/src/error.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use serde_json; use prost::{DecodeError, EncodeError}; use hyper::{Body, Response, Method, Version, header, StatusCode}; diff --git a/xds/src/lib.rs b/xds/src/lib.rs index d3a7e94..ba2a6b5 100644 --- a/xds/src/lib.rs +++ b/xds/src/lib.rs @@ -22,13 +22,10 @@ pub mod protocol; pub mod request; pub mod response; -pub mod wrapper; pub mod error; pub mod util; - -pub use wrapper::*; pub use request::*; pub use response::*; pub use error::*; diff --git a/xds/src/protocol/error.rs b/xds/src/protocol/error.rs index c1cdd8f..1496243 100644 --- a/xds/src/protocol/error.rs +++ b/xds/src/protocol/error.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use std::{convert::From, error, fmt, result, str}; pub type Result<T> = result::Result<T, Error>; diff --git a/xds/src/protocol/message.rs b/xds/src/protocol/message.rs index 1895172..52bb806 100644 --- a/xds/src/protocol/message.rs +++ b/xds/src/protocol/message.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use byteorder::{BigEndian, ByteOrder}; use enum_primitive_derive::Primitive; use flate2::{read::GzDecoder, write::GzEncoder, Compression}; diff --git a/xds/src/protocol/mod.rs b/xds/src/protocol/mod.rs index 1d650a8..b9eaa8f 100644 --- a/xds/src/protocol/mod.rs +++ b/xds/src/protocol/mod.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + pub mod error; pub mod message; diff --git a/xds/src/request.rs b/xds/src/request.rs index 4d497c0..c27aa91 100644 --- a/xds/src/request.rs +++ b/xds/src/request.rs @@ -1,6 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::convert::From; use hyper::{body, Body, Request, Uri, Method, Version, header}; use prost::{Message}; -use crate::util::*; use crate::error::*; /// A request with HTTP info and the serialized input object @@ -22,15 +39,14 @@ pub struct ServiceRequest<T> { pub input: T, } -impl<T> ServiceRequest<T> { +impl<T: Message + Default + 'static> ServiceRequest<T> { /// Create new service request with the given input object - /// /// This automatically sets the `Content-Type` header as `application/protobuf`. - pub fn new(input: T) -> ServiceRequest<T> { + pub fn new(input: T, uri: Uri) -> ServiceRequest<T> { let mut header = header::HeaderMap::new(); header.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap()); ServiceRequest { - uri: Default::default(), + uri, method: Method::POST, version: Version::default(), headers: header, @@ -38,41 +54,41 @@ impl<T> ServiceRequest<T> { } } - /// Copy this request with a different input value - pub fn clone_with_input<U>(&self, input: U) -> ServiceRequest<U> { - ServiceRequest { - uri: self.uri.clone(), - method: self.method.clone(), - version: self.version, - headers: self.headers.clone(), - input, - } + /// Turn a protobuf service request into a byte-array service request + pub fn try_encode(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> { + let mut body = Vec::new(); + self.input.encode(&mut body) + .map(|v| + ServiceRequest { + uri: self.uri.clone(), + method: self.method.clone(), + version: self.version, + headers: self.headers.clone(), + input: body, + }) + .map_err(|e| DBProstError::ProstEncodeError(e)) } -} -impl<T: Message + Default + 'static> From<T> for ServiceRequest<T> { - fn from(v: T) -> ServiceRequest<T> { ServiceRequest::new(v) } + /// Turn a protobuf service request into a hyper request + pub fn into_encoded_hyper(&self) -> Result<Request<Body>, DBProstError> { + self.try_encode().map(|v| v.into_hyper()) + } } impl ServiceRequest<Vec<u8>> { - /// Turn a hyper request to a boxed future of a byte-array service request - pub fn from_hyper_raw(req: Request<Body>) -> BoxFutureReq<Vec<u8>> { - Box::pin(async move { - let uri = req.uri().clone(); - let method = req.method().clone(); - let version = req.version(); - let headers = req.headers().clone(); - let future_req = body::to_bytes(req.into_body()).await - .map_err(DBProstError::HyperError) - .map(move |body| - ServiceRequest { uri, method, version, headers, input: body.to_vec() } - ); - future_req - }) + /// Turn a hyper request to ServiceRequest<Vec<u8>> + pub async fn try_from_hyper(req: Request<Body>) -> Result<ServiceRequest<Vec<u8>>, DBProstError> { + let uri = req.uri().clone(); + let method = req.method().clone(); + let version = req.version(); + let headers = req.headers().clone(); + body::to_bytes(req.into_body()).await + .map_err(DBProstError::HyperError) + .map(move |body| ServiceRequest { uri, method, version, headers, input: body.to_vec() }) } - /// Turn a byte-array service request into a hyper request - pub fn to_hyper_raw(&self) -> Request<Body> { + /// Turn ServiceRequest<Vec<u8>> into a hyper request + pub fn into_hyper(&self) -> Request<Body> { let mut request = Request::builder() .method(self.method.clone()) .uri(self.uri.clone()) @@ -85,47 +101,26 @@ impl ServiceRequest<Vec<u8>> { request } - /// Turn a byte-array service request into a `AfterBodyError`-wrapped version of the given error - pub fn body_err(&self, err: DBProstError) -> DBProstError { - DBProstError::AfterBodyError { - body: self.input.clone(), - method: Some(self.method.clone()), - version: self.version, - headers: self.headers.clone(), - status: None, - err: Box::new(err), - } - } - - /// Serialize the byte-array service request into a protobuf service request - pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> { - match T::decode(self.input.as_ref()) { - Ok(v) => Ok(self.clone_with_input(v)), - Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err))) - } + /// try_decode ServiceRequest<Vec<u8>> whose input is protobuf data into a ServiceRequest<T> + pub fn try_decode<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> { + T::decode(self.input.as_ref()) + .map(|v| + ServiceRequest { + uri: self.uri.clone(), + method: self.method.clone(), + version: self.version, + headers: self.headers.clone(), + input:v, + }) + .map_err(|e| + DBProstError::AfterBodyError { + body: self.input.clone(), + method: Some(self.method.clone()), + version: self.version, + headers: self.headers.clone(), + status: None, + err: Box::new(DBProstError::ProstDecodeError(e)), + }) } } -impl<T: Message + Default + 'static> ServiceRequest<T> { - /// Turn a protobuf service request into a byte-array service request - pub fn to_proto_raw(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> { - let mut body = Vec::new(); - if let Err(err) = self.input.encode(&mut body) { - Err(DBProstError::ProstEncodeError(err)) - } else { - Ok(self.clone_with_input(body)) - } - } - - /// Turn a hyper request into a protobuf service request - pub fn from_hyper_proto(req: Request<Body>) -> BoxFutureReq<T> { - Box::pin(async move { - ServiceRequest::from_hyper_raw(req).await.and_then(|v| v.to_proto()) - }) - } - - /// Turn a protobuf service request into a hyper request - pub fn to_hyper_proto(&self) -> Result<Request<Body>, DBProstError> { - self.to_proto_raw().map(|v| v.to_hyper_raw()) - } -} \ No newline at end of file diff --git a/xds/src/response.rs b/xds/src/response.rs index b38db8b..96f02e6 100644 --- a/xds/src/response.rs +++ b/xds/src/response.rs @@ -1,6 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use hyper::{body, Body, Response, Version, header, StatusCode}; use prost::{Message}; -use crate::util::*; use crate::error::*; @@ -41,28 +57,23 @@ impl<T> ServiceResponse<T> { } } -impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> { - fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) } -} +// impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> { +// fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) } +// } impl ServiceResponse<Vec<u8>> { /// Turn a hyper response to a boxed future of a byte-array service response - pub fn from_hyper_raw(resp: Response<Body>) -> BoxFutureResp<Vec<u8>> { - Box::pin(async move { - let version = resp.version(); - let headers = resp.headers().clone(); - let status = resp.status().clone(); - let future_resp = body::to_bytes(resp.into_body()).await - .map_err(DBProstError::HyperError) - .map(move |body| - ServiceResponse { version, headers, status, output: body.to_vec() } - ); - future_resp - }) + pub async fn try_from_hyper(resp: Response<Body>) -> Result<ServiceResponse<Vec<u8>>, DBProstError> { + let version = resp.version(); + let headers = resp.headers().clone(); + let status = resp.status().clone(); + body::to_bytes(resp.into_body()).await + .map_err(DBProstError::HyperError) + .map(move |body| ServiceResponse { version, headers, status, output: body.to_vec() }) } /// Turn a byte-array service response into a hyper response - pub fn to_hyper_raw(&self) -> Response<Body> { + pub fn into_hyper(&self) -> Response<Body> { let mut resp = Response::builder() .status(StatusCode::OK) .body(Body::from(self.output.clone())).unwrap(); @@ -74,58 +85,55 @@ impl ServiceResponse<Vec<u8>> { resp } - /// Turn a byte-array service response into a `AfterBodyError`-wrapped version of the given error - pub fn body_err(&self, err: DBProstError) -> DBProstError { - DBProstError::AfterBodyError { - body: self.output.clone(), - method: None, - version: self.version, - headers: self.headers.clone(), - status: Some(self.status), - err: Box::new(err), - } - } - - /// Serialize the byte-array service response into a protobuf service response - pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> { + pub fn try_decode<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> { if self.status.is_success() { - match T::decode(self.output.as_ref()) { - Ok(v) => Ok(self.clone_with_output(v)), - Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err))) - } + T::decode(self.output.as_ref()) + .map(|v| + ServiceResponse { + version: self.version, + headers: self.headers.clone(), + status: self.status.clone(), + output: v, + }) + .map_err(|e| + DBProstError::AfterBodyError { + body: self.output.clone(), + method: None, + version: self.version, + headers: self.headers.clone(), + status: Some(self.status), + err: Box::new(DBProstError::ProstDecodeError(e)), + }) } else { - match DBError::from_json_bytes(self.status, &self.output) { - Ok(err) => Err(self.body_err(DBProstError::DBWrapError(err))), - Err(err) => Err(self.body_err(DBProstError::JsonDecodeError(err))) - } + let err= DBError::new(self.status.clone(), "internal_err", "Internal Error"); + Err(DBProstError::DBWrapError(err)) } } } impl<T: Message + Default + 'static> ServiceResponse<T> { - /// Turn a protobuf service response into a byte-array service response - pub fn to_proto_raw(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> { + pub fn try_encode(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> { let mut body = Vec::new(); - if let Err(err) = self.output.encode(&mut body) { - Err(DBProstError::ProstEncodeError(err)) - } else { - Ok(self.clone_with_output(body)) - } + self.output.encode(&mut body) + .map(|v| + ServiceResponse { + version: self.version, + headers: self.headers.clone(), + status: self.status.clone(), + output: body, + }) + .map_err(|e| DBProstError::ProstEncodeError(e)) } - /// Turn a hyper response into a protobuf service response - pub async fn from_hyper_proto(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> { - // Box::pin(async move { - ServiceResponse::from_hyper_raw(resp).await.and_then(|v| v.to_proto()) - // }) + pub async fn decode_response(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> { + ServiceResponse::try_from_hyper(resp).await.and_then(|v| v.try_decode()) } - /// Turn a protobuf service response into a hyper response - pub fn to_hyper_proto(&self) -> Result<Response<Body>, DBProstError> { - self.to_proto_raw().map(|v| v.to_hyper_raw()) + pub fn into_encoded_hyper(&self) -> Result<Response<Body>, DBProstError> { + self.try_encode().map(|v| v.into_hyper()) } } diff --git a/xds/src/server/.DS_Store b/xds/src/server/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/xds/src/server/.DS_Store differ diff --git a/xds/src/server/server.rs b/xds/src/server/server.rs index dd8930a..f27bc0a 100644 --- a/xds/src/server/server.rs +++ b/xds/src/server/server.rs @@ -16,14 +16,11 @@ */ use std::convert::Infallible; -use std::env; -use std::process::Output; use std::task::Poll; -use hyper::client::service; use hyper::{Body, Request, Response, Server as hyper_server}; -use hyper::service::{make_service_fn, service_fn}; +use hyper::service::{ make_service_fn, service_fn }; use tower::Service; -use futures::future::{ ready, Ready, BoxFuture}; +use futures::future::{ ready, BoxFuture }; use std::net::SocketAddr; use crate::protocol::message::*; @@ -38,8 +35,16 @@ impl RpcServer { } } - pub async fn start(&self) { + pub async fn start<S>(&self, service: S) + where + S: Service<Request<Body>, Response = Response<Body>, Error = Infallible> + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { env_logger::init(); + let make_service = make_service_fn(|_conn| async { let svc = RPCHandler; Ok::<_, Infallible>(svc) diff --git a/xds/src/util.rs b/xds/src/util.rs index 2fbe3f3..39374fb 100644 --- a/xds/src/util.rs +++ b/xds/src/util.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use futures::future::BoxFuture; use crate::request::ServiceRequest; use crate::response::ServiceResponse; diff --git a/xds/src/wrapper.rs b/xds/src/wrapper.rs deleted file mode 100644 index b80ffa8..0000000 --- a/xds/src/wrapper.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::sync::Arc; -use std::task::Poll; -use futures::future::BoxFuture; - -use hyper::{Body, Request, Response, header, StatusCode}; - -use hyper::client::conn::Builder; -use hyper::client::connect::HttpConnector; -use hyper::client::service::Connect; - -use tower::Service; -use prost::{Message}; - -use crate::request::ServiceRequest; -use crate::response::ServiceResponse; -use crate::util::*; -use crate::error::*; - - - -/// A wrapper for a hyper client -#[derive(Debug)] -pub struct HyperClient { - /// The hyper client - /// The root URL without any path attached - pub root_url: String, -} - -impl HyperClient { - /// Create a new client wrapper for the given client and root using protobuf - pub fn new(root_url: &str) -> HyperClient { - HyperClient { - root_url: root_url.trim_end_matches('/').to_string(), - } - } - - // Invoke the given request for the given path and return a boxed future result - pub async fn request<I, O>(&self, path: &str, req: ServiceRequest<I>) -> Result<ServiceResponse<O>, DBProstError> - where I: Message + Default + 'static, O: Message + Default + 'static { - let hyper_req = req.to_hyper_proto().unwrap(); - let mut hyper_connect = Connect::new(HttpConnector::new(), Builder::new()); - - let url_string = format!("{}/{}", self.root_url, path.trim_start_matches('/')); - let uri = url_string.parse::<hyper::Uri>().unwrap(); - - let mut req_to_send = hyper_connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap(); - - let hyper_resp = req_to_send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap(); - ServiceResponse::from_hyper_proto(hyper_resp).await - } -} - -/// Service for taking a raw service request and returning a boxed future of a raw service response -pub trait HyperService { - /// Accept a raw service request and return a boxed future of a raw service response - fn handle(&self, req: ServiceRequest<Vec<u8>>) -> BoxFutureResp<Vec<u8>>; -} - -/// A wrapper for a `HyperService` trait that keeps a `Arc` version of the service -pub struct HyperServer<T> { - /// The `Arc` version of the service - /// - /// Needed because of [hyper Service lifetimes](https://github.com/tokio-rs/tokio-service/issues/9) - pub service: Arc<T>, -} - -impl<T> HyperServer<T> { - /// Create a new service wrapper for the given impl - pub fn new(service: T) -> HyperServer<T> { HyperServer { service: Arc::new(service) } } -} - -impl<T> Service<Request<Body>> for HyperServer<T> - where T: 'static + Send + Sync + HyperService -{ - type Response = Response<Body>; - type Error = hyper::Error; - type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; - - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request<Body>) -> Self::Future { - let content_type = req.headers().get(header::CONTENT_TYPE).unwrap().to_str().unwrap(); - if content_type == "application/protobuf" { - Box::pin(async move { - let status = StatusCode::UNSUPPORTED_MEDIA_TYPE; - Ok(DBError::new(status, "bad_content_type", "Content type must be application/protobuf").to_hyper_resp()) - }) - } else { - let service = self.service.clone(); - Box::pin(async move { - let request = ServiceRequest::from_hyper_raw(req).await; - let resp = service.handle(request.unwrap()).await; - resp.map(|v| v.to_hyper_raw()) - .or_else(|err| match err.root_err() { - DBProstError::ProstDecodeError(_) => - Ok(DBError::new(StatusCode::BAD_REQUEST, "protobuf_decode_err", "Invalid protobuf body"). - to_hyper_resp()), - DBProstError::DBWrapError(err) => - Ok(err.to_hyper_resp()), - DBProstError::HyperError(err) => - Err(err), - _ => Ok(DBError::new(StatusCode::INTERNAL_SERVER_ERROR, "internal_err", "Internal Error"). - to_hyper_resp()), - }) - }) - } - } -} \ No newline at end of file
