This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch poc-transport in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 457941d7cfa857e332d8ad6041b3df161a45d93e Author: even <[email protected]> AuthorDate: Fri Jun 17 15:03:20 2022 +0800 feat(examples) : Adding jsonrpc-basic example . --- dubbo-rust-examples/Cargo.toml | 25 ++++ .../src/jsonrpc-basic/addservice.rs | 156 +++++++++++++++++++++ dubbo-rust-examples/src/jsonrpc-basic/client.rs | 37 +++++ dubbo-rust-examples/src/jsonrpc-basic/server.rs | 54 +++++++ 4 files changed, 272 insertions(+) diff --git a/dubbo-rust-examples/Cargo.toml b/dubbo-rust-examples/Cargo.toml index 5013bb9..3078752 100644 --- a/dubbo-rust-examples/Cargo.toml +++ b/dubbo-rust-examples/Cargo.toml @@ -6,3 +6,28 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = { version = "0.1.56" } +serde = { version = "1.0.137", features = ["derive"] } +serde_json = { version = "1.0.81" } +tower = { version = "0.4.12" } +env_logger = "0.9.0" +log = "0.4.17" +tokio = { version = "1.19.2", features = ["full"] } +hyper = { version = "0.14.19", features = ["tcp","client","http1"] } + +dubbo-rust-protocol = { path = "../dubbo-rust-protocol" } + + + + +#jsonrpc-basic + +[[bin]] +name = "jsonrpc-basic-client" +path = "src/jsonrpc-basic/client.rs" + + +[[bin]] +name = "jsonrpc-basic-server" +path = "src/jsonrpc-basic/server.rs" + diff --git a/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs b/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs new file mode 100644 index 0000000..91f554b --- /dev/null +++ b/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// this file should be auto generated by crate dubbo-rust-build (WIP) , now it is hand-write : ) + +#![allow(dead_code)] + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::{future::Future, pin::Pin}; + +pub type StdError = Box<(dyn std::error::Error + Send + Sync + 'static)>; +pub type BoxFuture<R, E> = Pin<Box<(dyn Future<Output = Result<R, E>> + Send + 'static)>>; + +#[derive(Debug, Deserialize, Serialize)] +pub struct AddReq { + pub numbers: Vec<i32>, +} + +pub type AddResp = i32; + +#[async_trait] +pub trait AddService { + async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError>; +} + +pub mod add_service { + use std::task::Poll; + + use dubbo_rust_protocol::{ + jsonrpc::{Request, Response}, + NamedService, + }; + + use super::{AddReq, AddService, BoxFuture, StdError}; + + #[derive(Clone)] + pub struct AddServer<T: AddService + Clone> { + inner: T, + } + + impl<T: AddService + Clone> AddServer<T> { + pub fn new(service: T) -> Self { + Self { inner: service } + } + } + + impl<T: AddService + Clone> NamedService for AddServer<T> { + const SERVICE_NAME: &'static str = "AddService"; + } + + impl<T> tower::Service<Request> for AddServer<T> + where + T: AddService + Clone + Send + 'static, + { + type Response = Response; + + type Error = StdError; + + type Future = BoxFuture<Self::Response, Self::Error>; + + fn poll_ready( + &mut self, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let method_not_found = |req: Request| { + Box::pin(async move { + let error = dubbo_rust_protocol::jsonrpc::Error { + code: -32601, + message: "Method not found".to_string(), + }; + + Ok(Response::from_request_error(&req, error)) + }) + }; + + if req.header.method.is_none() { + return method_not_found(req); + } + + let mut inner_service = self.inner.clone(); + match req.header.method.as_ref().unwrap().as_str() { + "add" => Box::pin(async move { + let params: AddReq = serde_json::from_value(req.params.clone())?; + let result = inner_service.add(params).await?; + Ok(Response::from_request(&req, result)?) + }), + _ => method_not_found(req), + } + } + } +} + +pub mod add_client { + use std::net::SocketAddr; + + use super::{AddReq, AddResp, StdError}; + use dubbo_rust_protocol::jsonrpc::Request as JsonRpcRequest; + use dubbo_rust_protocol::jsonrpc::Response as JsonRpcResponse; + use hyper::body::HttpBody; + use hyper::client::HttpConnector; + + pub struct AddClient { + addr: SocketAddr, + http_client: hyper::Client<HttpConnector>, + } + + impl AddClient { + pub fn new(addr: &SocketAddr) -> Result<Self, StdError> { + let client = hyper::Client::new(); + Ok(Self { + addr: addr.clone(), + http_client: client, + }) + } + + pub async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError> { + let req = JsonRpcRequest::new("add", req)?; + let req_str = req.to_string()?; + let http_request = hyper::Request::builder() + .method("POST") + .uri(format!("http://{}", self.addr.to_string())) + .body(hyper::Body::from(req_str)) + .expect("request builder"); + + let mut resp = self.http_client.request(http_request).await?; + + let body = resp.data().await; + if body.is_none() { + return Err("body empty".into()); + } + let body = body.unwrap()?; + let jsonrpc_resp = JsonRpcResponse::from_slice(&body.to_vec())?; + + Ok(jsonrpc_resp.get_body()?) + } + } +} diff --git a/dubbo-rust-examples/src/jsonrpc-basic/client.rs b/dubbo-rust-examples/src/jsonrpc-basic/client.rs new file mode 100644 index 0000000..ef402ba --- /dev/null +++ b/dubbo-rust-examples/src/jsonrpc-basic/client.rs @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod addservice; + +use std::{net::SocketAddr, str::FromStr}; + +use addservice::{add_client::AddClient, AddReq}; + +#[tokio::main] +async fn main() { + let addr = SocketAddr::from_str("127.0.0.1:40021").unwrap(); + + let mut client = AddClient::new(&addr).unwrap(); + + let req = AddReq { + numbers: vec![1, 2, 21], + }; + + let resp = client.add(req).await.unwrap(); + + println!("resp : {:?}", resp); +} diff --git a/dubbo-rust-examples/src/jsonrpc-basic/server.rs b/dubbo-rust-examples/src/jsonrpc-basic/server.rs new file mode 100644 index 0000000..79bd7ec --- /dev/null +++ b/dubbo-rust-examples/src/jsonrpc-basic/server.rs @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dubbo_rust_protocol::jsonrpc::server::{JsonRpcServer, JsonRpcService}; +use log::info; +use std::{net::SocketAddr, str::FromStr}; + +mod addservice; + +use addservice::{add_service::AddServer, AddReq, AddResp, AddService, StdError}; + +#[derive(Clone)] +struct MyAdd; + +#[async_trait::async_trait] +impl AddService for MyAdd { + async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError> { + info!("get request {:?}", req); + Ok(req.numbers.iter().sum()) + } +} + +#[tokio::main] +async fn main() { + // log + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .init(); + + let addr = SocketAddr::from_str("0.0.0.0:40021").unwrap(); + let rt = tokio::runtime::Handle::current(); + + let clone_service = JsonRpcService::new(AddServer::new(MyAdd)); + + let server = JsonRpcServer::new(&addr, rt, clone_service); + + info!("Server start at {}", addr.to_string()); + + server.await.unwrap(); +}
