This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 54181bf  Feat/cluster (#138)
54181bf is described below

commit 54181bf417536e75ed6a4ff49efaf7a5911fd5d3
Author: Yang Yang <[email protected]>
AuthorDate: Tue May 16 21:29:31 2023 +0800

    Feat/cluster (#138)
    
    * refactor(cluster): add Cluster MockImpl
    
    * refactor(triple): use ClientBuilder to init Cluster ability
    
    * Update builder.rs
    
    update default direct value
    
    * Update triple.rs
    
    handle unused var
    
    * Update mod.rs
    
    comment some codes
    
    * refactor(triple): rm unused var in clientBuilder
---
 dubbo/Cargo.toml                            |   1 +
 dubbo/src/cluster/mod.rs                    | 101 ++++++++++++++++++++++++----
 dubbo/src/protocol/mod.rs                   |  12 +++-
 dubbo/src/protocol/triple/triple_invoker.rs |  26 ++++---
 dubbo/src/triple/client/builder.rs          |  44 ++++++++----
 dubbo/src/triple/client/triple.rs           |  33 +++------
 6 files changed, 159 insertions(+), 58 deletions(-)

diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 94a2bd8..91b19d6 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -34,6 +34,7 @@ axum = "0.5.9"
 async-stream = "0.3"
 flate2 = "1.0"
 aws-smithy-http = "0.54.1"
+dyn-clone = "1.0.11"
 itertools.workspace = true
 urlencoding.workspace = true
 lazy_static.workspace = true
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index b6d8a7c..4f73d2f 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,24 +15,41 @@
  * limitations under the License.
  */
 
-use std::{sync::Arc, task::Poll};
+use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll};
 
 use aws_smithy_http::body::SdkBody;
-use tower_service::Service;
+use dubbo_base::Url;
 
-use crate::{empty_body, protocol::BoxInvoker};
+use crate::{
+    empty_body,
+    invocation::RpcInvocation,
+    protocol::{BoxInvoker, Invoker},
+};
 
 pub mod directory;
 pub mod loadbalance;
 pub mod support;
 
-pub trait Directory {
-    fn list(&self, meta: String) -> Vec<BoxInvoker>;
+pub trait Directory: Debug {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
     fn is_empty(&self) -> bool;
 }
 
-type BoxDirectory = Box<dyn Directory>;
+type BoxDirectory = Box<dyn Directory + Send + Sync>;
 
+pub trait Cluster {
+    fn join(&self, dir: BoxDirectory) -> BoxInvoker;
+}
+
+#[derive(Debug, Default)]
+pub struct MockCluster {}
+
+impl Cluster for MockCluster {
+    fn join(&self, dir: BoxDirectory) -> BoxInvoker {
+        Box::new(FailoverCluster::new(dir))
+    }
+}
+#[derive(Clone, Debug)]
 pub struct FailoverCluster {
     dir: Arc<BoxDirectory>,
 }
@@ -43,7 +60,7 @@ impl FailoverCluster {
     }
 }
 
-impl Service<http::Request<SdkBody>> for FailoverCluster {
+impl Invoker<http::Request<SdkBody>> for FailoverCluster {
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
@@ -66,7 +83,11 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
             .method(req.method().clone());
         *clone_req.headers_mut().unwrap() = req.headers().clone();
         let r = clone_req.body(clone_body).unwrap();
-        let invokers = self.dir.list("service_name".to_string());
+        let invokers = self.dir.list(
+            RpcInvocation::default()
+                .with_service_unique_name("hello".to_string())
+                .into(),
+        );
         for mut invoker in invokers {
             let fut = async move {
                 let res = invoker.call(r).await;
@@ -83,19 +104,75 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
                 .unwrap())
         })
     }
+
+    fn get_url(&self) -> dubbo_base::Url {
+        Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
+    }
+}
+
+#[derive(Debug, Default)]
+pub struct MockDirectory {
+    // router_chain: RouterChain,
+    invokers: Vec<BoxInvoker>,
 }
 
-pub struct MockDirectory {}
+impl MockDirectory {
+    pub fn new(invokers: Vec<BoxInvoker>) -> MockDirectory {
+        Self {
+            // router_chain: RouterChain::default(),
+            invokers,
+        }
+    }
+}
 
 impl Directory for MockDirectory {
-    fn list(&self, _meta: String) -> Vec<BoxInvoker> {
+    fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         // tracing::info!("MockDirectory: {}", meta);
-        // let u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+        let _u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
         // vec![Box::new(TripleInvoker::new(u))]
-        todo!()
+        // self.router_chain.route(u, invo);
+        self.invokers.clone()
     }
 
     fn is_empty(&self) -> bool {
         false
     }
 }
+
+#[derive(Debug, Default)]
+pub struct RouterChain {
+    router: HashMap<String, BoxRouter>,
+    invokers: Vec<BoxInvoker>,
+}
+
+impl RouterChain {
+    pub fn route(&self, url: Url, invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> 
{
+        let r = self.router.get("mock").unwrap();
+        r.route(self.invokers.clone(), url, invo)
+    }
+}
+
+pub trait Router: Debug {
+    fn route(
+        &self,
+        invokers: Vec<BoxInvoker>,
+        url: Url,
+        invo: Arc<RpcInvocation>,
+    ) -> Vec<BoxInvoker>;
+}
+
+pub type BoxRouter = Box<dyn Router + Sync + Send>;
+
+#[derive(Debug, Default)]
+pub struct MockRouter {}
+
+impl Router for MockRouter {
+    fn route(
+        &self,
+        invokers: Vec<BoxInvoker>,
+        _url: Url,
+        _invo: Arc<RpcInvocation>,
+    ) -> Vec<BoxInvoker> {
+        invokers
+    }
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 58dca5f..145bcc8 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -23,6 +23,7 @@ use std::{
 
 use async_trait::async_trait;
 use aws_smithy_http::body::SdkBody;
+use dyn_clone::DynClone;
 use tower_service::Service;
 
 use dubbo_base::Url;
@@ -43,7 +44,7 @@ pub trait Exporter {
     fn unexport(&self);
 }
 
-pub trait Invoker<ReqBody>: Debug {
+pub trait Invoker<ReqBody>: Debug + DynClone {
     type Response;
 
     type Error;
@@ -68,6 +69,15 @@ pub type BoxInvoker = Box<
         + Sync,
 >;
 
+dyn_clone::clone_trait_object!(
+    Invoker<
+        http::Request<SdkBody>,
+        Response = http::Response<crate::BoxBody>,
+        Error = crate::Error,
+        Future = crate::BoxFuture<http::Response<crate::BoxBody>, 
crate::Error>,
+    >
+);
+
 pub struct WrapperInvoker<T>(T);
 
 impl<T, ReqBody> Service<http::Request<ReqBody>> for WrapperInvoker<T>
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index 6139cc9..fb661f9 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -17,24 +17,32 @@
 
 use aws_smithy_http::body::SdkBody;
 use dubbo_base::Url;
-use std::fmt::{Debug, Formatter};
+use std::{
+    fmt::{Debug, Formatter},
+    str::FromStr,
+};
 use tower_service::Service;
 
-use crate::{protocol::Invoker, triple::client::builder::ClientBoxService};
+use crate::{
+    protocol::Invoker,
+    triple::{client::builder::ClientBoxService, 
transport::connection::Connection},
+    utils::boxed_clone::BoxCloneService,
+};
 
+#[derive(Clone)]
 pub struct TripleInvoker {
     url: Url,
     conn: ClientBoxService,
 }
 
 impl TripleInvoker {
-    // pub fn new(url: Url) -> TripleInvoker {
-    //     let uri = http::Uri::from_str(&url.to_url()).unwrap();
-    //     Self {
-    //         url,
-    //         conn: ClientBuilder::from_uri(&uri).build()connect(),
-    //     }
-    // }
+    pub fn new(url: Url) -> TripleInvoker {
+        let uri = http::Uri::from_str(&url.to_url()).unwrap();
+        Self {
+            url,
+            conn: BoxCloneService::new(Connection::new().with_host(uri)),
+        }
+    }
 }
 
 impl Debug for TripleInvoker {
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index cf667cc..29957a6 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -16,18 +16,19 @@
  */
 
 use crate::{
-    cluster::directory::StaticDirectory,
-    codegen::{ClusterInvoker, Directory, RegistryDirectory},
+    cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory},
+    codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker},
     triple::compression::CompressionEncoding,
-    utils::boxed::BoxService,
+    utils::boxed_clone::BoxCloneService,
 };
 
 use aws_smithy_http::body::SdkBody;
+use dubbo_base::Url;
 
 use super::TripleClient;
 
 pub type ClientBoxService =
-    BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
+    BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
 
 #[derive(Clone, Debug, Default)]
 pub struct ClientBuilder {
@@ -35,6 +36,8 @@ pub struct ClientBuilder {
     pub connector: &'static str,
     directory: Option<Box<dyn Directory>>,
     cluster_invoker: Option<ClusterInvoker>,
+    pub direct: bool,
+    host: String,
 }
 
 impl ClientBuilder {
@@ -44,6 +47,8 @@ impl ClientBuilder {
             connector: "",
             directory: None,
             cluster_invoker: None,
+            direct: false,
+            host: "".to_string(),
         }
     }
 
@@ -53,15 +58,8 @@ impl ClientBuilder {
             connector: "",
             directory: Some(Box::new(StaticDirectory::new(&host))),
             cluster_invoker: None,
-        }
-    }
-
-    pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
-        Self {
-            timeout: None,
-            connector: "",
-            directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
-            cluster_invoker: None,
+            direct: true,
+            host: host.clone().to_string(),
         }
     }
 
@@ -104,11 +102,29 @@ impl ClientBuilder {
         }
     }
 
+    pub fn with_direct(self, direct: bool) -> Self {
+        Self { direct, ..self }
+    }
+
     pub fn build(self) -> TripleClient {
-        TripleClient {
+        let mut cli = TripleClient {
             send_compression_encoding: Some(CompressionEncoding::Gzip),
             directory: self.directory,
             cluster_invoker: self.cluster_invoker,
+            invoker: None,
+        };
+        if self.direct {
+            cli.invoker = Some(Box::new(TripleInvoker::new(
+                Url::from_url(&self.host).unwrap(),
+            )));
+            return cli;
         }
+
+        let cluster = 
MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new(
+            
TripleInvoker::new(Url::from_url("http://127.0.0.1:8888";).unwrap()),
+        )])));
+
+        cli.invoker = Some(cluster);
+        cli
     }
 }
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 56edb96..eb7934d 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -30,6 +30,7 @@ use crate::codegen::{ClusterInvoker, Directory, 
RpcInvocation};
 use crate::{
     cluster::support::cluster_invoker::ClusterRequestBuilder,
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
+    protocol::BoxInvoker,
     triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, 
encode::encode},
 };
 
@@ -38,11 +39,12 @@ pub struct TripleClient {
     pub(crate) send_compression_encoding: Option<CompressionEncoding>,
     pub(crate) directory: Option<Box<dyn Directory>>,
     pub(crate) cluster_invoker: Option<ClusterInvoker>,
+    pub invoker: Option<BoxInvoker>,
 }
 
 impl TripleClient {
     pub fn connect(host: String) -> Self {
-        let builder = ClientBuilder::from_static(&host);
+        let builder = ClientBuilder::from_static(&host).with_direct(true);
 
         builder.build()
     }
@@ -135,7 +137,7 @@ impl TripleClient {
         req: Request<M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
-        invocation: RpcInvocation,
+        _invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -150,27 +152,14 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
-        let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
+        let bytes = hyper::body::to_bytes(body).await.unwrap();
+        let sdk_body = SdkBody::from(bytes);
+
+        // let mut conn = Connection::new().with_host(http_uri);
+        let mut conn = self.invoker.clone().unwrap();
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
 
-        let mut conn = Connection::new().with_host(http_uri);
         let response = conn
             .call(req)
             .await

Reply via email to