This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/poc-transport by this push:
     new 7c0303d  feat(protocal): adding triple protocol implement
     new f1f7be8  Merge pull request #25 from Ticsmtc/adding-jsonrpc-example
7c0303d is described below

commit 7c0303df2ff22bd5d0cb166e584f251475f34424
Author: even <[email protected]>
AuthorDate: Thu Aug 4 20:46:25 2022 +0800

    feat(protocal): adding triple protocol implement
---
 dubbo-rust-protocol/Cargo.toml            |   5 +-
 dubbo-rust-protocol/src/jsonrpc/server.rs |  48 +-------------
 dubbo-rust-protocol/src/lib.rs            |  99 +++++++++++++++++++++++++++++
 dubbo-rust-protocol/src/triple/codec.rs   |  51 +++++++++++++++
 dubbo-rust-protocol/src/triple/mod.rs     |   3 +
 dubbo-rust-protocol/src/triple/server.rs  | 101 ++++++++++++++++++++++++++++++
 dubbo-rust-protocol/src/triple/service.rs |  74 ++++++++++++++++++++++
 7 files changed, 334 insertions(+), 47 deletions(-)

diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml
index 3505f97..c3650e6 100644
--- a/dubbo-rust-protocol/Cargo.toml
+++ b/dubbo-rust-protocol/Cargo.toml
@@ -14,4 +14,7 @@ tokio = "1.19.2"
 tower = { version = "0.4.12", features = ["util"] }
 futures = "0.3.21"
 log = "0.4.17"
-http = "0.2.8"
\ No newline at end of file
+http = "0.2.8"
+http-body = "*"
+prost = "0.10.4"
+async-trait = "*"
\ No newline at end of file
diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs 
b/dubbo-rust-protocol/src/jsonrpc/server.rs
index 4e6e51f..6aad022 100644
--- a/dubbo-rust-protocol/src/jsonrpc/server.rs
+++ b/dubbo-rust-protocol/src/jsonrpc/server.rs
@@ -30,18 +30,12 @@ use log::trace;
 use pin_project_lite::pin_project;
 use tokio::io::{AsyncRead, AsyncWrite};
 
+use super::super::{wrap_future, OneConnection, SrvFut, StdError};
 use super::Request as JsonRpcRequest;
 use super::Response as JsonRpcResponse;
 use crate::NamedService;
 use tower::util::BoxCloneService;
 
-fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
-where
-    F: Future<Output = Result<R, E>> + Send + 'static,
-{
-    Box::pin(fut)
-}
-
 pin_project! {
    pub struct JsonRpcServer<S> {
         #[pin]
@@ -73,35 +67,6 @@ impl<S> JsonRpcServer<S> {
     }
 }
 
-type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send + 
'static>>;
-
-pin_project! {
-    struct OneConnection<IO,S>
-    where S: tower::Service<HttpRequest<Body>,Response = 
HttpResponse<Body>,Error = StdError, Future = 
SrvFut<HttpResponse<Body>,StdError>>
-    {
-        #[pin]
-        connection: Connection<IO,S>
-    }
-}
-
-impl<IO, S> Future for OneConnection<IO, S>
-where
-    S: tower::Service<
-            HttpRequest<Body>,
-            Response = HttpResponse<Body>,
-            Error = StdError,
-            Future = SrvFut<HttpResponse<Body>, StdError>,
-        > + Unpin,
-    IO: AsyncRead + AsyncWrite + Unpin,
-{
-    type Output = Result<(), hyper::Error>;
-
-    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
Poll<Self::Output> {
-        self.project().connection.poll_without_shutdown(cx)
-    }
-}
-
-type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
 impl<S> Future for JsonRpcServer<S>
 where
     S: tower::Service<
@@ -224,16 +189,7 @@ impl JsonRpcServiceBuilder {
     }
 }
 
-impl tower::Service<HttpRequest<Body>> for JsonRpcService
-// where
-// S: tower::Service<
-//     JsonRpcRequest,
-//     Response = JsonRpcResponse,
-//     Error = StdError,
-//     Future = SrvFut<JsonRpcResponse, StdError>,
-// >,
-// S: Clone + Send + 'static,
-{
+impl tower::Service<HttpRequest<Body>> for JsonRpcService {
     type Response = HttpResponse<Body>;
 
     type Error = StdError;
diff --git a/dubbo-rust-protocol/src/lib.rs b/dubbo-rust-protocol/src/lib.rs
index 9565a85..b85f1a6 100644
--- a/dubbo-rust-protocol/src/lib.rs
+++ b/dubbo-rust-protocol/src/lib.rs
@@ -1,5 +1,104 @@
+use http::request::Request as HttpRequest;
+use http::response::Response as HttpResponse;
+use hyper::server::conn::Connection;
+use hyper::Body;
+use pin_project_lite::pin_project;
+use std::any::Any;
+use std::pin::Pin;
+use std::task::Poll;
+use std::{convert::Infallible, future::Future};
+use tokio::io::{AsyncRead, AsyncWrite};
+
 pub mod jsonrpc;
+pub mod triple;
+
+pub(crate) fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
+where
+    F: Future<Output = Result<R, E>> + Send + 'static,
+{
+    Box::pin(fut)
+}
+
+type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send + 
'static>>;
 
 pub trait NamedService {
     const SERVICE_NAME: &'static str;
 }
+
+pin_project! {
+    struct OneConnection<IO,S>
+    where S: tower::Service<HttpRequest<Body>,Response = 
HttpResponse<Body>,Error = StdError, Future = 
SrvFut<HttpResponse<Body>,StdError>>
+    {
+        #[pin]
+        connection: Connection<IO,S>
+    }
+}
+
+type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+
+impl<IO, S> Future for OneConnection<IO, S>
+where
+    S: tower::Service<
+            HttpRequest<Body>,
+            Response = HttpResponse<Body>,
+            Error = StdError,
+            Future = SrvFut<HttpResponse<Body>, StdError>,
+        > + Unpin,
+    IO: AsyncRead + AsyncWrite + Unpin,
+{
+    type Output = Result<(), hyper::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
Poll<Self::Output> {
+        self.project().connection.poll_without_shutdown(cx)
+    }
+}
+
+// codec define
+pub trait Codec {
+    fn decode<T>(req: HttpRequest<Body>) -> Result<T, Infallible>;
+    fn encode<T>(req: T) -> Result<HttpResponse<Body>, Infallible>;
+}
+
+pub struct TripleRequest {
+    inner: Box<dyn Any + 'static + Send>,
+}
+
+impl TripleRequest {
+    pub fn from<T: 'static + Send>(typ: T) -> Self {
+        Self {
+            inner: Box::new(typ),
+        }
+    }
+
+    pub fn downcast<T: 'static>(self) -> Box<T> {
+        self.inner.downcast().unwrap()
+    }
+}
+
+pub struct TripleResponse {
+    inner: Box<dyn Any + 'static + Send>,
+}
+
+impl TripleResponse {
+    pub fn from<T: 'static + Send>(typ: T) -> Self {
+        Self {
+            inner: Box::new(typ),
+        }
+    }
+
+    pub fn downcast<T: 'static>(self) -> Box<T> {
+        self.inner.downcast().unwrap()
+    }
+}
+
+pub trait Api: Clone + Send + 'static + Sync {
+    fn call_method(
+        &self,
+        method: &str,
+        request: TripleRequest,
+    ) -> Pin<Box<dyn Future<Output = Result<TripleResponse, ()>> + Send + 
'static>>;
+
+    fn decode(&self, method: &str, req: HttpRequest<Body>) -> 
Result<TripleRequest, ()>;
+
+    fn encode(&self, method: &str, resp: TripleResponse) -> 
Result<HttpResponse<Body>, ()>;
+}
diff --git a/dubbo-rust-protocol/src/triple/codec.rs 
b/dubbo-rust-protocol/src/triple/codec.rs
new file mode 100644
index 0000000..47a3c2e
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/codec.rs
@@ -0,0 +1,51 @@
+use std::{convert::Infallible, marker::PhantomData};
+
+use http::HeaderMap;
+// triple codec
+use http::request::Request as HttpRequest;
+use http::response::Response as HttpResponse;
+use hyper::body::HttpBody;
+use hyper::Body;
+
+pub struct TripleCodec;
+
+impl TripleCodec {
+    // decode from http to triple request
+    pub async fn decode_from_http(req: HttpRequest<Body>) -> Result<REQ, ()> {
+        let mut req = Box::pin(req);
+        match req.data().await {
+            Some(Ok(b)) => {
+                let msg: Result<REQ, _> = prost::Message::decode(&b[5..]);
+                if let Ok(msg) = msg {
+                    return Ok(msg);
+                }
+            }
+            _ => {}
+        }
+
+        return Err(());
+    }
+
+    // encode from triple response to http
+    pub async fn encode_to_http(req: super::service::) -> 
Result<HttpResponse<Body>, ()> {
+        let mut buffer = Vec::<u8>::new();
+        if let Ok(_) = req.encode(&mut buffer) {
+            buffer.insert(0, buffer.len() as u8);
+            for _ in 0..4 {
+                buffer.insert(0, 0u8);
+            }
+        }
+
+        let (mut trailer, inner) = Body::channel();
+
+        let mut headers = HeaderMap::new();
+        headers.insert("grpc-status", 0i32.into());
+
+        trailer.send_data(buffer.into()).await.unwrap();
+        trailer.send_trailers(headers).await.unwrap();
+
+        let resp = HttpResponse::builder().status(200).body(inner).unwrap();
+
+        return Ok(resp);
+    }
+}
diff --git a/dubbo-rust-protocol/src/triple/mod.rs 
b/dubbo-rust-protocol/src/triple/mod.rs
new file mode 100644
index 0000000..8d18839
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/mod.rs
@@ -0,0 +1,3 @@
+pub mod codec;
+pub mod server;
+pub mod service;
diff --git a/dubbo-rust-protocol/src/triple/server.rs 
b/dubbo-rust-protocol/src/triple/server.rs
new file mode 100644
index 0000000..bf60902
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/server.rs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::future::Future;
+use std::{net::SocketAddr, pin::Pin, task::Poll};
+
+use futures::ready;
+use http::{Request as HttpRequest, Response as HttpResponse};
+use hyper::server::accept::Accept;
+use hyper::server::conn::{AddrIncoming, AddrStream};
+use hyper::server::conn::{Connection, Http};
+use hyper::Body;
+use log::trace;
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use super::super::{wrap_future, OneConnection, SrvFut, StdError};
+use crate::NamedService;
+use tower::util::BoxCloneService;
+
+pin_project! {
+    pub struct TripleRpcServer<S> {
+        #[pin]
+        incoming: AddrIncoming,
+        rt_handle: tokio::runtime::Handle,
+        service: S,
+    }
+}
+
+impl<S> TripleRpcServer<S> {
+    pub fn new(addr: &SocketAddr, handle: tokio::runtime::Handle, service: S) 
-> Self
+    where
+        S: tower::Service<HttpRequest<Body>> + Clone,
+    {
+        let incoming = AddrIncoming::bind(addr).unwrap();
+        Self {
+            incoming: incoming,
+            rt_handle: handle,
+            service,
+        }
+    }
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<AddrStream, std::io::Error>>> {
+        let me = self.project();
+        me.incoming.poll_accept(cx)
+    }
+}
+
+impl<S> Future for TripleRpcServer<S>
+where
+    S: tower::Service<
+        HttpRequest<Body>,
+        Response = HttpResponse<Body>,
+        Error = StdError,
+        Future = SrvFut<HttpResponse<Body>, StdError>,
+    >,
+    S: Clone + Send + 'static + Unpin,
+{
+    type Output = Result<(), StdError>;
+
+    fn poll(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Self::Output> {
+        loop {
+            let ret = ready!(self.as_mut().poll_next(cx));
+            match ret {
+                Some(Ok(stream)) => {
+                    trace!("Get conn {}", stream.remote_addr());
+
+                    let connection = Http::new()
+                        .http2_only(true)
+                        .http2_enable_connect_protocol()
+                        .serve_connection(stream, self.service.clone());
+
+                    let one_conn = OneConnection { connection };
+                    self.rt_handle.spawn(one_conn);
+                }
+                Some(Err(e)) => return Poll::Ready(Err(e.into())),
+                None => return Poll::Ready(Err("option none".into())),
+            }
+        }
+    }
+}
diff --git a/dubbo-rust-protocol/src/triple/service.rs 
b/dubbo-rust-protocol/src/triple/service.rs
new file mode 100644
index 0000000..267fdcd
--- /dev/null
+++ b/dubbo-rust-protocol/src/triple/service.rs
@@ -0,0 +1,74 @@
+use hyper::Body;
+use hyper::Request as HttpRequest;
+use hyper::Response as HttpResponse;
+use tower::Service;
+
+use crate::Api;
+use crate::SrvFut;
+use crate::StdError;
+
+pub struct TripleService<S> {
+    inner: S,
+}
+
+impl<S: Api> TripleService<S> {
+    pub fn new(srv: S) -> Self {
+        Self { inner: srv }
+    }
+}
+
+impl<S: Api> Service<HttpRequest<Body>> for TripleService<S> {
+    type Response = HttpResponse<Body>;
+
+    type Error = StdError;
+
+    type Future = SrvFut<Self::Response, Self::Error>;
+
+    fn poll_ready(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
+        std::task::Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: HttpRequest<Body>) -> Self::Future {
+        let method_not_found = || {
+            Box::pin(async move {
+                Ok(HttpResponse::builder()
+                    .status(404)
+                    .body(Body::empty())
+                    .unwrap())
+            })
+        };
+        let method_name = req.uri().path().split("/").last();
+        if method_name.is_none() {
+            return method_not_found();
+        }
+        let method_name = method_name.unwrap();
+
+        let inner_service = self.inner.clone();
+        Box::pin(async move {
+            let fail = Ok(HttpResponse::builder()
+                .status(500)
+                .body(Body::empty())
+                .unwrap());
+
+            if let Ok(req) = inner_service.decode(method_name, req) {
+                if let Ok(resp) = inner_service.call_method(method_name, 
req).await {
+                    if let Ok(http_resp) = inner_service.encode(method_name, 
resp) {
+                        Ok(HttpResponse::builder()
+                            .status(200)
+                            .body(Body::empty())
+                            .unwrap())
+                    } else {
+                        return fail;
+                    }
+                } else {
+                    return fail;
+                }
+            } else {
+                return fail;
+            }
+        })
+    }
+}

Reply via email to