This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch poc-transport in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 69f64979a77338f52610e4cee8f296438d61b1ce Author: even <[email protected]> AuthorDate: Fri Jun 24 11:06:31 2022 +0800 feat(protocol): Adding service router support. --- dubbo-rust-protocol/Cargo.toml | 2 +- dubbo-rust-protocol/src/jsonrpc/server.rs | 110 +++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 18 deletions(-) diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml index 6e108ea..3505f97 100644 --- a/dubbo-rust-protocol/Cargo.toml +++ b/dubbo-rust-protocol/Cargo.toml @@ -11,7 +11,7 @@ serde_json = { version = "1.0.81" } pin-project-lite = { version = "0.2.9" } hyper = { version = "0.14.19", features = ["server","http1","tcp","http2","client"] } tokio = "1.19.2" -tower = "0.4.12" +tower = { version = "0.4.12", features = ["util"] } futures = "0.3.21" log = "0.4.17" http = "0.2.8" \ No newline at end of file diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs b/dubbo-rust-protocol/src/jsonrpc/server.rs index ed87953..4e6e51f 100644 --- a/dubbo-rust-protocol/src/jsonrpc/server.rs +++ b/dubbo-rust-protocol/src/jsonrpc/server.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +use std::collections::HashMap; use std::future::Future; use std::{net::SocketAddr, pin::Pin, task::Poll}; @@ -31,6 +32,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use super::Request as JsonRpcRequest; use super::Response as JsonRpcResponse; +use crate::NamedService; +use tower::util::BoxCloneService; fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E> where @@ -139,12 +142,21 @@ where //////////////////////////////////// #[derive(Clone)] -pub struct JsonRpcService<S> { - service: S, +pub struct JsonRpcService { + // service: HashMap<String, Box<S>>, + service: HashMap<String, BoxCloneService<JsonRpcRequest, JsonRpcResponse, StdError>>, } -impl<S> JsonRpcService<S> { - pub fn new(service: S) -> Self +pub struct JsonRpcServiceBuilder { + ident: Option<JsonRpcService>, +} + +impl JsonRpcService { + pub fn builder() -> JsonRpcServiceBuilder { + JsonRpcServiceBuilder { ident: None } + } + + pub fn new<S>(service: S) -> Self where S: tower::Service< JsonRpcRequest, @@ -152,21 +164,75 @@ impl<S> JsonRpcService<S> { Error = StdError, Future = SrvFut<JsonRpcResponse, StdError>, >, - S: Clone + Send + 'static, + S: Clone + Send + Sync + 'static, + S: NamedService, { - Self { service: service } + let mut mm = HashMap::new(); + + mm.insert(S::SERVICE_NAME.to_string(), BoxCloneService::new(service)); + + Self { service: mm } + } + + pub fn add_service<S>(&mut self, service: S) -> Result<(), StdError> + where + S: tower::Service< + JsonRpcRequest, + Response = JsonRpcResponse, + Error = StdError, + Future = SrvFut<JsonRpcResponse, StdError>, + >, + S: Clone + Send + Sync + 'static, + S: NamedService, + { + if self.service.contains_key(S::SERVICE_NAME) { + return Err(format!("dupplicate service name {}", S::SERVICE_NAME).into()); + } + + self.service + .insert(S::SERVICE_NAME.to_string(), BoxCloneService::new(service)); + Ok(()) } } -impl<S> tower::Service<HttpRequest<Body>> for JsonRpcService<S> -where - S: tower::Service< - JsonRpcRequest, - Response = JsonRpcResponse, - Error = StdError, - Future = SrvFut<JsonRpcResponse, StdError>, - >, - S: Clone + Send + 'static, +impl JsonRpcServiceBuilder { + pub fn add_service<S>(&mut self, service: S) -> Result<(), StdError> + where + S: tower::Service< + JsonRpcRequest, + Response = JsonRpcResponse, + Error = StdError, + Future = SrvFut<JsonRpcResponse, StdError>, + >, + S: Clone + Send + Sync + 'static, + S: NamedService, + { + if self.ident.is_none() { + self.ident.replace(JsonRpcService::new(service)); + return Ok(()); + } + + self.ident.as_mut().unwrap().add_service(service) + } + + pub fn build(mut self) -> Result<JsonRpcService, StdError> { + if self.ident.is_none() { + return Err("nothing build".into()); + } + + Ok(self.ident.take().unwrap()) + } +} + +impl tower::Service<HttpRequest<Body>> for JsonRpcService +// where +// S: tower::Service< +// JsonRpcRequest, +// Response = JsonRpcResponse, +// Error = StdError, +// Future = SrvFut<JsonRpcResponse, StdError>, +// >, +// S: Clone + Send + 'static, { type Response = HttpResponse<Body>; @@ -179,8 +245,18 @@ where } fn call(&mut self, mut req: HttpRequest<Body>) -> Self::Future { - // serde - let mut inner_service = self.service.clone(); + // fetch service + let service_name = req + .uri() + .path() + .trim_start_matches("/") + .trim_end_matches("/"); + + if !self.service.contains_key(service_name) { + return wrap_future(async { Ok(HttpResponse::builder().body(Body::empty()).unwrap()) }); + } + + let mut inner_service = self.service.get(service_name).unwrap().clone(); wrap_future(async move { if let Some(data) = req.data().await { if let Err(ref e) = data {
