This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch poc-transport in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 209757d13b4a2cf4e43b38b7555d6be3e49fe4a5 Author: even <[email protected]> AuthorDate: Fri Jun 17 15:02:25 2022 +0800 feat(protocol) : Adding jsonrpc protocol . --- dubbo-rust-protocol/Cargo.toml | 17 +++ dubbo-rust-protocol/src/jsonrpc/client.rs | 16 +++ dubbo-rust-protocol/src/jsonrpc/mod.rs | 165 +++++++++++++++++++++++ dubbo-rust-protocol/src/jsonrpc/server.rs | 214 ++++++++++++++++++++++++++++++ dubbo-rust-protocol/src/lib.rs | 5 + 5 files changed, 417 insertions(+) diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml new file mode 100644 index 0000000..6e108ea --- /dev/null +++ b/dubbo-rust-protocol/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "dubbo-rust-protocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0.137", features = ["derive"] } +serde_json = { version = "1.0.81" } +pin-project-lite = { version = "0.2.9" } +hyper = { version = "0.14.19", features = ["server","http1","tcp","http2","client"] } +tokio = "1.19.2" +tower = "0.4.12" +futures = "0.3.21" +log = "0.4.17" +http = "0.2.8" \ No newline at end of file diff --git a/dubbo-rust-protocol/src/jsonrpc/client.rs b/dubbo-rust-protocol/src/jsonrpc/client.rs new file mode 100644 index 0000000..2944f98 --- /dev/null +++ b/dubbo-rust-protocol/src/jsonrpc/client.rs @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ diff --git a/dubbo-rust-protocol/src/jsonrpc/mod.rs b/dubbo-rust-protocol/src/jsonrpc/mod.rs new file mode 100644 index 0000000..fab67d6 --- /dev/null +++ b/dubbo-rust-protocol/src/jsonrpc/mod.rs @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod client; +pub mod server; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::json; + +#[derive(Debug, Clone)] +pub struct Header { + pub jsonrpc: Option<String>, + pub id: Option<String>, + pub method: Option<String>, +} + +pub struct Request { + pub header: Header, + pub params: serde_json::Value, +} +impl Request { + fn from_slice(slice: Vec<u8>) -> Result<Self, serde_json::Error> { + let pre: serde_json::map::Map<String, serde_json::Value> = serde_json::from_slice(&slice)?; + + let header = Header { + jsonrpc: Self::get_json_value(pre.get("jsonrpc"))?, + id: Self::get_json_value(pre.get("id"))?, + method: Self::get_json_value(pre.get("method"))?, + }; + + let params = serde_json::from_value(pre.get("params").unwrap_or(&json!({})).clone())?; + + Ok(Self { header, params }) + } + + fn get_json_value<T: DeserializeOwned>( + value: Option<&serde_json::Value>, + ) -> Result<Option<T>, serde_json::Error> { + match value { + None => Ok(None), + Some(v) => { + let ret: T = serde_json::from_value(v.clone())?; + + Ok(Some(ret)) + } + } + } + + pub fn new<T: Serialize>(method_name: &str, req: T) -> Result<Self, serde_json::Error> { + Ok(Self { + header: Header { + jsonrpc: Some("2.0".to_string()), + id: Some("1".to_string()), + method: Some(method_name.to_string()), + }, + params: serde_json::to_value(&req)?, + }) + } + + pub fn to_string(&self) -> Result<String, serde_json::Error> { + Ok(serde_json::to_string(&json!({ + "jsonrpc": self.header.jsonrpc, + "id": self.header.id, + "method": self.header.method, + "params": self.params + }))?) + } +} + +pub struct Response { + header: Header, + error: Option<Error>, + result: Option<serde_json::Value>, +} + +impl Response { + pub fn from_request<B: Serialize>( + request: &Request, + result: B, + ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> { + let p = serde_json::to_value(result)?; + Ok(Self { + header: request.header.clone(), + error: None, + result: Some(p), + }) + } + + pub fn from_request_error(request: &Request, error: Error) -> Self { + Self { + header: request.header.clone(), + error: Some(error), + result: None, + } + } + + pub fn to_string(&self) -> Result<String, serde_json::Error> { + let ret = json!({ + "jsonrpc": self.header.jsonrpc, + "id": self.header.id, + "method": self.header.method, + "result": self.result, + "error": self.error + }); + + Ok(serde_json::to_string(&ret)?) + } + + pub fn from_slice(slice: &Vec<u8>) -> Result<Self, serde_json::Error> { + let pre: serde_json::map::Map<String, serde_json::Value> = serde_json::from_slice(&slice)?; + + let header = Header { + jsonrpc: Self::get_json_value(pre.get("jsonrpc"))?, + id: Self::get_json_value(pre.get("id"))?, + method: Self::get_json_value(pre.get("method"))?, + }; + + let result = serde_json::from_value(pre.get("result").unwrap_or(&json!({})).clone())?; + let error = serde_json::from_value(pre.get("error").unwrap_or(&json!({})).clone())?; + Ok(Self { + header, + result, + error, + }) + } + + pub fn get_body<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> { + Ok(serde_json::from_value( + self.result.as_ref().unwrap_or(&json!({})).clone(), + )?) + } + + fn get_json_value<T: DeserializeOwned>( + value: Option<&serde_json::Value>, + ) -> Result<Option<T>, serde_json::Error> { + match value { + None => Ok(None), + Some(v) => { + let ret: T = serde_json::from_value(v.clone())?; + + Ok(Some(ret)) + } + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Error { + pub code: i64, + pub message: String, +} diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs b/dubbo-rust-protocol/src/jsonrpc/server.rs new file mode 100644 index 0000000..ed87953 --- /dev/null +++ b/dubbo-rust-protocol/src/jsonrpc/server.rs @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::future::Future; +use std::{net::SocketAddr, pin::Pin, task::Poll}; + +use futures::ready; +use http::{Request as HttpRequest, Response as HttpResponse}; +use hyper::body::HttpBody; +use hyper::server::accept::Accept; +use hyper::server::conn::{AddrIncoming, AddrStream}; +use hyper::server::conn::{Connection, Http}; +use hyper::Body; +use log::trace; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; + +use super::Request as JsonRpcRequest; +use super::Response as JsonRpcResponse; + +fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E> +where + F: Future<Output = Result<R, E>> + Send + 'static, +{ + Box::pin(fut) +} + +pin_project! { + pub struct JsonRpcServer<S> { + #[pin] + incoming: AddrIncoming, + rt_handle: tokio::runtime::Handle, + service: S + } +} + +impl<S> JsonRpcServer<S> { + pub fn new(addr: &SocketAddr, handle: tokio::runtime::Handle, service: S) -> Self + where + S: tower::Service<HttpRequest<Body>> + Clone, + { + let incoming = AddrIncoming::bind(addr).unwrap(); + Self { + incoming: incoming, + rt_handle: handle, + service, + } + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Result<AddrStream, std::io::Error>>> { + let me = self.project(); + me.incoming.poll_accept(cx) + } +} + +type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'static>>; + +pin_project! { + struct OneConnection<IO,S> + where S: tower::Service<HttpRequest<Body>,Response = HttpResponse<Body>,Error = StdError, Future = SrvFut<HttpResponse<Body>,StdError>> + { + #[pin] + connection: Connection<IO,S> + } +} + +impl<IO, S> Future for OneConnection<IO, S> +where + S: tower::Service< + HttpRequest<Body>, + Response = HttpResponse<Body>, + Error = StdError, + Future = SrvFut<HttpResponse<Body>, StdError>, + > + Unpin, + IO: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(), hyper::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { + self.project().connection.poll_without_shutdown(cx) + } +} + +type StdError = Box<dyn std::error::Error + Send + Sync + 'static>; +impl<S> Future for JsonRpcServer<S> +where + S: tower::Service< + HttpRequest<Body>, + Response = HttpResponse<Body>, + Error = StdError, + Future = SrvFut<HttpResponse<Body>, StdError>, + >, + S: Clone + Send + 'static + Unpin, +{ + type Output = Result<(), StdError>; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + loop { + let ret = ready!(self.as_mut().poll_next(cx)); + match ret { + Some(Ok(stream)) => { + trace!("Get conn {}", stream.remote_addr()); + + let connection = Http::new() + .http1_only(true) + .http1_keep_alive(true) + .serve_connection(stream, self.service.clone()); + + let one_conn = OneConnection { connection }; + self.rt_handle.spawn(one_conn); + } + Some(Err(e)) => return Poll::Ready(Err(e.into())), + None => return Poll::Ready(Err("option none".into())), + } + } + } +} + +//////////////////////////////////// + +#[derive(Clone)] +pub struct JsonRpcService<S> { + service: S, +} + +impl<S> JsonRpcService<S> { + pub fn new(service: S) -> Self + where + S: tower::Service< + JsonRpcRequest, + Response = JsonRpcResponse, + Error = StdError, + Future = SrvFut<JsonRpcResponse, StdError>, + >, + S: Clone + Send + 'static, + { + Self { service: service } + } +} + +impl<S> tower::Service<HttpRequest<Body>> for JsonRpcService<S> +where + S: tower::Service< + JsonRpcRequest, + Response = JsonRpcResponse, + Error = StdError, + Future = SrvFut<JsonRpcResponse, StdError>, + >, + S: Clone + Send + 'static, +{ + type Response = HttpResponse<Body>; + + type Error = StdError; + + type Future = SrvFut<Self::Response, Self::Error>; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut req: HttpRequest<Body>) -> Self::Future { + // serde + let mut inner_service = self.service.clone(); + wrap_future(async move { + if let Some(data) = req.data().await { + if let Err(ref e) = data { + trace!("Get body error {}", e); + } + let data = data?; + + let request = JsonRpcRequest::from_slice(data.to_vec()); + + if let Err(ref e) = request { + trace!("Serde error {}", e); + } + let request = request?; + + let fut = inner_service.call(request); + let res = fut.await?; + + let response_string = res.to_string()?; + + return Ok(HttpResponse::builder() + .body(response_string.into()) + .unwrap()); + } else { + trace!("none"); + } + + trace!("get req {:?}", req); + Ok(HttpResponse::builder().body(Body::empty()).unwrap()) + }) + } +} diff --git a/dubbo-rust-protocol/src/lib.rs b/dubbo-rust-protocol/src/lib.rs new file mode 100644 index 0000000..9565a85 --- /dev/null +++ b/dubbo-rust-protocol/src/lib.rs @@ -0,0 +1,5 @@ +pub mod jsonrpc; + +pub trait NamedService { + const SERVICE_NAME: &'static str; +}
