This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/poc-transport by this push:
new 7c0303d feat(protocal): adding triple protocol implement
new f1f7be8 Merge pull request #25 from Ticsmtc/adding-jsonrpc-example
7c0303d is described below
commit 7c0303df2ff22bd5d0cb166e584f251475f34424
Author: even <[email protected]>
AuthorDate: Thu Aug 4 20:46:25 2022 +0800
feat(protocal): adding triple protocol implement
---
dubbo-rust-protocol/Cargo.toml | 5 +-
dubbo-rust-protocol/src/jsonrpc/server.rs | 48 +-------------
dubbo-rust-protocol/src/lib.rs | 99 +++++++++++++++++++++++++++++
dubbo-rust-protocol/src/triple/codec.rs | 51 +++++++++++++++
dubbo-rust-protocol/src/triple/mod.rs | 3 +
dubbo-rust-protocol/src/triple/server.rs | 101 ++++++++++++++++++++++++++++++
dubbo-rust-protocol/src/triple/service.rs | 74 ++++++++++++++++++++++
7 files changed, 334 insertions(+), 47 deletions(-)
diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml
index 3505f97..c3650e6 100644
--- a/dubbo-rust-protocol/Cargo.toml
+++ b/dubbo-rust-protocol/Cargo.toml
@@ -14,4 +14,7 @@ tokio = "1.19.2"
tower = { version = "0.4.12", features = ["util"] }
futures = "0.3.21"
log = "0.4.17"
-http = "0.2.8"
\ No newline at end of file
+http = "0.2.8"
+http-body = "*"
+prost = "0.10.4"
+async-trait = "*"
\ No newline at end of file
diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs
b/dubbo-rust-protocol/src/jsonrpc/server.rs
index 4e6e51f..6aad022 100644
--- a/dubbo-rust-protocol/src/jsonrpc/server.rs
+++ b/dubbo-rust-protocol/src/jsonrpc/server.rs
@@ -30,18 +30,12 @@ use log::trace;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
+use super::super::{wrap_future, OneConnection, SrvFut, StdError};
use super::Request as JsonRpcRequest;
use super::Response as JsonRpcResponse;
use crate::NamedService;
use tower::util::BoxCloneService;
-fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
-where
- F: Future<Output = Result<R, E>> + Send + 'static,
-{
- Box::pin(fut)
-}
-
pin_project! {
pub struct JsonRpcServer<S> {
#[pin]
@@ -73,35 +67,6 @@ impl<S> JsonRpcServer<S> {
}
}
-type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send +
'static>>;
-
-pin_project! {
- struct OneConnection<IO,S>
- where S: tower::Service<HttpRequest<Body>,Response =
HttpResponse<Body>,Error = StdError, Future =
SrvFut<HttpResponse<Body>,StdError>>
- {
- #[pin]
- connection: Connection<IO,S>
- }
-}
-
-impl<IO, S> Future for OneConnection<IO, S>
-where
- S: tower::Service<
- HttpRequest<Body>,
- Response = HttpResponse<Body>,
- Error = StdError,
- Future = SrvFut<HttpResponse<Body>, StdError>,
- > + Unpin,
- IO: AsyncRead + AsyncWrite + Unpin,
-{
- type Output = Result<(), hyper::Error>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) ->
Poll<Self::Output> {
- self.project().connection.poll_without_shutdown(cx)
- }
-}
-
-type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
impl<S> Future for JsonRpcServer<S>
where
S: tower::Service<
@@ -224,16 +189,7 @@ impl JsonRpcServiceBuilder {
}
}
-impl tower::Service<HttpRequest<Body>> for JsonRpcService
-// where
-// S: tower::Service<
-// JsonRpcRequest,
-// Response = JsonRpcResponse,
-// Error = StdError,
-// Future = SrvFut<JsonRpcResponse, StdError>,
-// >,
-// S: Clone + Send + 'static,
-{
+impl tower::Service<HttpRequest<Body>> for JsonRpcService {
type Response = HttpResponse<Body>;
type Error = StdError;
diff --git a/dubbo-rust-protocol/src/lib.rs b/dubbo-rust-protocol/src/lib.rs
index 9565a85..b85f1a6 100644
--- a/dubbo-rust-protocol/src/lib.rs
+++ b/dubbo-rust-protocol/src/lib.rs
@@ -1,5 +1,104 @@
+use http::request::Request as HttpRequest;
+use http::response::Response as HttpResponse;
+use hyper::server::conn::Connection;
+use hyper::Body;
+use pin_project_lite::pin_project;
+use std::any::Any;
+use std::pin::Pin;
+use std::task::Poll;
+use std::{convert::Infallible, future::Future};
+use tokio::io::{AsyncRead, AsyncWrite};
+
pub mod jsonrpc;
+pub mod triple;
+
+pub(crate) fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
+where
+ F: Future<Output = Result<R, E>> + Send + 'static,
+{
+ Box::pin(fut)
+}
+
+type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send +
'static>>;
pub trait NamedService {
const SERVICE_NAME: &'static str;
}
+
+pin_project! {
+ struct OneConnection<IO,S>
+ where S: tower::Service<HttpRequest<Body>,Response =
HttpResponse<Body>,Error = StdError, Future =
SrvFut<HttpResponse<Body>,StdError>>
+ {
+ #[pin]
+ connection: Connection<IO,S>
+ }
+}
+
+type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+
+impl<IO, S> Future for OneConnection<IO, S>
+where
+ S: tower::Service<
+ HttpRequest<Body>,
+ Response = HttpResponse<Body>,
+ Error = StdError,
+ Future = SrvFut<HttpResponse<Body>, StdError>,
+ > + Unpin,
+ IO: AsyncRead + AsyncWrite + Unpin,
+{
+ type Output = Result<(), hyper::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) ->
Poll<Self::Output> {
+ self.project().connection.poll_without_shutdown(cx)
+ }
+}
+
+// codec define
+pub trait Codec {
+ fn decode<T>(req: HttpRequest<Body>) -> Result<T, Infallible>;
+ fn encode<T>(req: T) -> Result<HttpResponse<Body>, Infallible>;
+}
+
+pub struct TripleRequest {
+ inner: Box<dyn Any + 'static + Send>,
+}
+
+impl TripleRequest {
+ pub fn from<T: 'static + Send>(typ: T) -> Self {
+ Self {
+ inner: Box::new(typ),
+ }
+ }
+
+ pub fn downcast<T: 'static>(self) -> Box<T> {
+ self.inner.downcast().unwrap()
+ }
+}
+
+pub struct TripleResponse {
+ inner: Box<dyn Any + 'static + Send>,
+}
+
+impl TripleResponse {
+ pub fn from<T: 'static + Send>(typ: T) -> Self {
+ Self {
+ inner: Box::new(typ),
+ }
+ }
+
+ pub fn downcast<T: 'static>(self) -> Box<T> {
+ self.inner.downcast().unwrap()
+ }
+}
+
+pub trait Api: Clone + Send + 'static + Sync {
+ fn call_method(
+ &self,
+ method: &str,
+ request: TripleRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<TripleResponse, ()>> + Send +
'static>>;
+
+ fn decode(&self, method: &str, req: HttpRequest<Body>) ->
Result<TripleRequest, ()>;
+
+ fn encode(&self, method: &str, resp: TripleResponse) ->
Result<HttpResponse<Body>, ()>;
+}
diff --git a/dubbo-rust-protocol/src/triple/codec.rs
b/dubbo-rust-protocol/src/triple/codec.rs
new file mode 100644
index 0000000..47a3c2e
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/codec.rs
@@ -0,0 +1,51 @@
+use std::{convert::Infallible, marker::PhantomData};
+
+use http::HeaderMap;
+// triple codec
+use http::request::Request as HttpRequest;
+use http::response::Response as HttpResponse;
+use hyper::body::HttpBody;
+use hyper::Body;
+
+pub struct TripleCodec;
+
+impl TripleCodec {
+ // decode from http to triple request
+ pub async fn decode_from_http(req: HttpRequest<Body>) -> Result<REQ, ()> {
+ let mut req = Box::pin(req);
+ match req.data().await {
+ Some(Ok(b)) => {
+ let msg: Result<REQ, _> = prost::Message::decode(&b[5..]);
+ if let Ok(msg) = msg {
+ return Ok(msg);
+ }
+ }
+ _ => {}
+ }
+
+ return Err(());
+ }
+
+ // encode from triple response to http
+ pub async fn encode_to_http(req: super::service::) ->
Result<HttpResponse<Body>, ()> {
+ let mut buffer = Vec::<u8>::new();
+ if let Ok(_) = req.encode(&mut buffer) {
+ buffer.insert(0, buffer.len() as u8);
+ for _ in 0..4 {
+ buffer.insert(0, 0u8);
+ }
+ }
+
+ let (mut trailer, inner) = Body::channel();
+
+ let mut headers = HeaderMap::new();
+ headers.insert("grpc-status", 0i32.into());
+
+ trailer.send_data(buffer.into()).await.unwrap();
+ trailer.send_trailers(headers).await.unwrap();
+
+ let resp = HttpResponse::builder().status(200).body(inner).unwrap();
+
+ return Ok(resp);
+ }
+}
diff --git a/dubbo-rust-protocol/src/triple/mod.rs
b/dubbo-rust-protocol/src/triple/mod.rs
new file mode 100644
index 0000000..8d18839
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/mod.rs
@@ -0,0 +1,3 @@
+pub mod codec;
+pub mod server;
+pub mod service;
diff --git a/dubbo-rust-protocol/src/triple/server.rs
b/dubbo-rust-protocol/src/triple/server.rs
new file mode 100644
index 0000000..bf60902
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/server.rs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::future::Future;
+use std::{net::SocketAddr, pin::Pin, task::Poll};
+
+use futures::ready;
+use http::{Request as HttpRequest, Response as HttpResponse};
+use hyper::server::accept::Accept;
+use hyper::server::conn::{AddrIncoming, AddrStream};
+use hyper::server::conn::{Connection, Http};
+use hyper::Body;
+use log::trace;
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use super::super::{wrap_future, OneConnection, SrvFut, StdError};
+use crate::NamedService;
+use tower::util::BoxCloneService;
+
+pin_project! {
+ pub struct TripleRpcServer<S> {
+ #[pin]
+ incoming: AddrIncoming,
+ rt_handle: tokio::runtime::Handle,
+ service: S,
+ }
+}
+
+impl<S> TripleRpcServer<S> {
+ pub fn new(addr: &SocketAddr, handle: tokio::runtime::Handle, service: S)
-> Self
+ where
+ S: tower::Service<HttpRequest<Body>> + Clone,
+ {
+ let incoming = AddrIncoming::bind(addr).unwrap();
+ Self {
+ incoming: incoming,
+ rt_handle: handle,
+ service,
+ }
+ }
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Result<AddrStream, std::io::Error>>> {
+ let me = self.project();
+ me.incoming.poll_accept(cx)
+ }
+}
+
+impl<S> Future for TripleRpcServer<S>
+where
+ S: tower::Service<
+ HttpRequest<Body>,
+ Response = HttpResponse<Body>,
+ Error = StdError,
+ Future = SrvFut<HttpResponse<Body>, StdError>,
+ >,
+ S: Clone + Send + 'static + Unpin,
+{
+ type Output = Result<(), StdError>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ loop {
+ let ret = ready!(self.as_mut().poll_next(cx));
+ match ret {
+ Some(Ok(stream)) => {
+ trace!("Get conn {}", stream.remote_addr());
+
+ let connection = Http::new()
+ .http2_only(true)
+ .http2_enable_connect_protocol()
+ .serve_connection(stream, self.service.clone());
+
+ let one_conn = OneConnection { connection };
+ self.rt_handle.spawn(one_conn);
+ }
+ Some(Err(e)) => return Poll::Ready(Err(e.into())),
+ None => return Poll::Ready(Err("option none".into())),
+ }
+ }
+ }
+}
diff --git a/dubbo-rust-protocol/src/triple/service.rs
b/dubbo-rust-protocol/src/triple/service.rs
new file mode 100644
index 0000000..267fdcd
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/service.rs
@@ -0,0 +1,74 @@
+use hyper::Body;
+use hyper::Request as HttpRequest;
+use hyper::Response as HttpResponse;
+use tower::Service;
+
+use crate::Api;
+use crate::SrvFut;
+use crate::StdError;
+
+pub struct TripleService<S> {
+ inner: S,
+}
+
+impl<S: Api> TripleService<S> {
+ pub fn new(srv: S) -> Self {
+ Self { inner: srv }
+ }
+}
+
+impl<S: Api> Service<HttpRequest<Body>> for TripleService<S> {
+ type Response = HttpResponse<Body>;
+
+ type Error = StdError;
+
+ type Future = SrvFut<Self::Response, Self::Error>;
+
+ fn poll_ready(
+ &mut self,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ std::task::Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: HttpRequest<Body>) -> Self::Future {
+ let method_not_found = || {
+ Box::pin(async move {
+ Ok(HttpResponse::builder()
+ .status(404)
+ .body(Body::empty())
+ .unwrap())
+ })
+ };
+ let method_name = req.uri().path().split("/").last();
+ if method_name.is_none() {
+ return method_not_found();
+ }
+ let method_name = method_name.unwrap();
+
+ let inner_service = self.inner.clone();
+ Box::pin(async move {
+ let fail = Ok(HttpResponse::builder()
+ .status(500)
+ .body(Body::empty())
+ .unwrap());
+
+ if let Ok(req) = inner_service.decode(method_name, req) {
+ if let Ok(resp) = inner_service.call_method(method_name,
req).await {
+ if let Ok(http_resp) = inner_service.encode(method_name,
resp) {
+ Ok(HttpResponse::builder()
+ .status(200)
+ .body(Body::empty())
+ .unwrap())
+ } else {
+ return fail;
+ }
+ } else {
+ return fail;
+ }
+ } else {
+ return fail;
+ }
+ })
+ }
+}