This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch feat/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/feat/cluster by this push:
     new b2e829c  Feat(cluster): Cluster Policy Impl (#146)
b2e829c is described below

commit b2e829cc397679a28a729ca7de7b59c391f00641
Author: Yang Yang <[email protected]>
AuthorDate: Tue Aug 1 10:40:30 2023 +0800

    Feat(cluster): Cluster Policy Impl (#146)
    
    * refactor(cluster): comment the logic of clone body
    
    * Rft(triple): remove Clone of Invoker
    
    * Rft(cluster): use ready_cache to manage Invokers, add ready_cache in 
FailoverCluster
    
    * Rft(protocol): use interface Inheritance to redesign Invoker
    
    ---------
    
    Co-authored-by: G-XD <[email protected]>
    Co-authored-by: GXD <[email protected]>
---
 config/src/protocol.rs                       |  20 +++-
 dubbo-build/src/client.rs                    |   5 -
 dubbo/Cargo.toml                             |   7 +-
 dubbo/src/cluster/directory.rs               |  69 ++++--------
 dubbo/src/cluster/mod.rs                     | 133 ++++++++++++++++-------
 dubbo/src/cluster/support/cluster_invoker.rs | 147 --------------------------
 dubbo/src/cluster/support/mod.rs             |  20 ----
 dubbo/src/codegen.rs                         |   7 +-
 dubbo/src/protocol/mod.rs                    |  13 +--
 dubbo/src/protocol/triple/triple_invoker.rs  |  38 ++++---
 dubbo/src/registry/integration.rs            |   7 --
 dubbo/src/registry/mod.rs                    |  17 ---
 dubbo/src/triple/client/builder.rs           |  64 ++++++------
 dubbo/src/triple/client/triple.rs            | 151 ++++++++++-----------------
 examples/echo/Cargo.toml                     |   4 +-
 examples/echo/src/echo/client.rs             |   4 +-
 examples/greeter/Cargo.toml                  |   2 +-
 registry/zookeeper/src/lib.rs                |  12 +--
 18 files changed, 258 insertions(+), 462 deletions(-)

diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 86ff053..e2340c1 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -77,10 +77,26 @@ impl ProtocolRetrieve for ProtocolConfig {
         } else {
             let result = self.get_protocol(protocol_key);
             if let Some(..) = result {
-                panic!("default triple base dose not defined.")
-            } else {
                 result.unwrap()
+            } else {
+                panic!("default triple base dose not defined.")
             }
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+
+    use super::{ProtocolConfig, ProtocolRetrieve};
+
+    #[test]
+    #[should_panic(expected = "default triple base dose not defined")]
+    pub fn test_get_invalid_protocol() {
+        let config = ProtocolConfig::default();
+
+        let _ = config.get_protocol_or_default("");
+
+        ()
+    }
+}
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 1e1c9fb..bfcfe45 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -90,11 +90,6 @@ pub fn generate<T: Service>(
                     }
                 }
 
-                pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self 
{
-                    self.inner = self.inner.with_cluster(invoker);
-                    self
-                }
-
                 #methods
 
             }
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 94a2bd8..51ccc19 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -10,11 +10,11 @@ repository = "https://github.com/apache/dubbo-rust.git";
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-hyper = { version = "0.14.19", features = ["full"] }
+hyper = { version = "0.14.26", features = ["full"] }
 http = "0.2"
 tower-service.workspace = true
 http-body = "0.4.4"
-tower = { workspace = true, features = ["timeout"] }
+tower = { workspace = true, features = ["timeout", "ready-cache"] }
 futures-util = "0.3.23"
 futures-core ="0.3.23"
 argh = "0.1"
@@ -33,7 +33,8 @@ futures.workspace = true
 axum = "0.5.9"
 async-stream = "0.3"
 flate2 = "1.0"
-aws-smithy-http = "0.54.1"
+aws-smithy-http = "0.55.2"
+dyn-clone = "1.0.11"
 itertools.workspace = true
 urlencoding.workspace = true
 lazy_static.workspace = true
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index e74fc6d..afe9657 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -23,39 +23,21 @@ use std::{
 };
 
 use crate::{
+    codegen::TripleInvoker,
     invocation::{Invocation, RpcInvocation},
-    registry::{memory_registry::MemoryNotifyListener, BoxRegistry, 
RegistryWrapper},
+    protocol::BoxInvoker,
+    registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
 };
 use dubbo_base::Url;
 use dubbo_logger::tracing;
 
+use crate::cluster::Directory;
+
 /// Directory.
 ///
 /// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
-pub trait Directory: Debug + DirectoryClone {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
-}
 
-pub trait DirectoryClone {
-    fn clone_box(&self) -> Box<dyn Directory>;
-}
-
-impl<T> DirectoryClone for T
-where
-    T: 'static + Directory + Clone,
-{
-    fn clone_box(&self) -> Box<dyn Directory> {
-        Box::new(self.clone())
-    }
-}
-
-impl Clone for Box<dyn Directory> {
-    fn clone(&self) -> Box<dyn Directory> {
-        self.clone_box()
-    }
-}
-
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct StaticDirectory {
     uri: http::Uri,
 }
@@ -78,7 +60,7 @@ impl StaticDirectory {
 }
 
 impl Directory for StaticDirectory {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         let url = Url::from_url(&format!(
             "tri://{}:{}/{}",
             self.uri.host().unwrap(),
@@ -86,43 +68,28 @@ impl Directory for StaticDirectory {
             invocation.get_target_service_unique_name(),
         ))
         .unwrap();
-        vec![url]
-    }
-}
-
-impl DirectoryClone for StaticDirectory {
-    fn clone_box(&self) -> Box<dyn Directory> {
-        Box::new(StaticDirectory {
-            uri: self.uri.clone(),
-        })
+        let invoker = Box::new(TripleInvoker::new(url));
+        vec![invoker]
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct RegistryDirectory {
-    registry: RegistryWrapper,
+    registry: Arc<BoxRegistry>,
     service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
 }
 
 impl RegistryDirectory {
     pub fn new(registry: BoxRegistry) -> RegistryDirectory {
         RegistryDirectory {
-            registry: RegistryWrapper {
-                registry: Some(registry),
-            },
+            registry: Arc::new(registry),
             service_instances: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 }
 
-impl DirectoryClone for RegistryDirectory {
-    fn clone_box(&self) -> Box<dyn Directory> {
-        todo!()
-    }
-}
-
 impl Directory for RegistryDirectory {
-    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         let service_name = invocation.get_target_service_unique_name();
 
         let url = Url::from_url(&format!(
@@ -132,9 +99,6 @@ impl Directory for RegistryDirectory {
         .unwrap();
 
         self.registry
-            .registry
-            .as_ref()
-            .expect("msg")
             .subscribe(
                 url,
                 Arc::new(MemoryNotifyListener {
@@ -149,6 +113,11 @@ impl Directory for RegistryDirectory {
             .expect("service_instances.read");
         let binding = Vec::new();
         let url_vec = map.get(&service_name).unwrap_or(&binding);
-        url_vec.to_vec()
+        // url_vec.to_vec()
+        let mut invokers: Vec<BoxInvoker> = vec![];
+        for item in url_vec.iter() {
+            invokers.push(Box::new(TripleInvoker::new(item.clone())));
+        }
+        invokers
     }
 }
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index b6d8a7c..afc3cc9 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,31 +15,54 @@
  * limitations under the License.
  */
 
-use std::{sync::Arc, task::Poll};
+use std::{fmt::Debug, sync::Arc, task::Poll};
 
 use aws_smithy_http::body::SdkBody;
+use dubbo_base::Url;
+use futures_util::TryFutureExt;
+use tower::ready_cache::ReadyCache;
 use tower_service::Service;
 
-use crate::{empty_body, protocol::BoxInvoker};
+use crate::{
+    invocation::RpcInvocation,
+    protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker},
+};
 
 pub mod directory;
 pub mod loadbalance;
-pub mod support;
 
-pub trait Directory {
-    fn list(&self, meta: String) -> Vec<BoxInvoker>;
-    fn is_empty(&self) -> bool;
+pub trait Directory: Debug {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
 }
 
-type BoxDirectory = Box<dyn Directory>;
+type BoxDirectory = Box<dyn Directory + Send + Sync>;
 
+pub trait Cluster {
+    fn join(&self, dir: BoxDirectory) -> BoxInvoker;
+}
+
+#[derive(Debug, Default)]
+pub struct MockCluster {}
+
+impl Cluster for MockCluster {
+    fn join(&self, dir: BoxDirectory) -> BoxInvoker {
+        Box::new(FailoverCluster::new(dir))
+    }
+}
+
+// 在Cluster上进行缓存Service
+#[derive(Debug)]
 pub struct FailoverCluster {
     dir: Arc<BoxDirectory>,
+    caches: ReadyCache<usize, BoxInvoker, http::Request<SdkBody>>,
 }
 
 impl FailoverCluster {
     pub fn new(dir: BoxDirectory) -> FailoverCluster {
-        Self { dir: Arc::new(dir) }
+        Self {
+            dir: Arc::new(dir),
+            caches: ReadyCache::default(),
+        }
     }
 }
 
@@ -59,43 +82,75 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
     }
 
     fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
-        println!("req: {}", req.body().content_length().unwrap());
-        let clone_body = req.body().try_clone().unwrap();
-        let mut clone_req = http::Request::builder()
-            .uri(req.uri().clone())
-            .method(req.method().clone());
-        *clone_req.headers_mut().unwrap() = req.headers().clone();
-        let r = clone_req.body(clone_body).unwrap();
-        let invokers = self.dir.list("service_name".to_string());
-        for mut invoker in invokers {
-            let fut = async move {
-                let res = invoker.call(r).await;
-                return res;
-            };
-            return Box::pin(fut);
+        // let clone_body = req.body().try_clone().unwrap();
+        // let mut clone_req = http::Request::builder()
+        //     .uri(req.uri().clone())
+        //     .method(req.method().clone());
+        // *clone_req.headers_mut().unwrap() = req.headers().clone();
+        // let r = clone_req.body(clone_body).unwrap();
+        let invokers = self.dir.list(
+            RpcInvocation::default()
+                .with_service_unique_name("hello".to_string())
+                .into(),
+        );
+        let mut i: usize = 0;
+        for invoker in invokers {
+            self.caches.push(i, invoker);
+            i += 1;
         }
-        Box::pin(async move {
-            Ok(http::Response::builder()
-                .status(200)
-                .header("grpc-status", "12")
-                .header("content-type", "application/grpc")
-                .body(empty_body())
-                .unwrap())
-        })
+
+        Box::pin(self.caches.call_ready_index(0, req).map_err(Into::into))
     }
 }
 
-pub struct MockDirectory {}
+impl Invoker<http::Request<SdkBody>> for FailoverCluster {
+    fn get_url(&self) -> dubbo_base::Url {
+        Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
+    }
+}
 
-impl Directory for MockDirectory {
-    fn list(&self, _meta: String) -> Vec<BoxInvoker> {
-        // tracing::info!("MockDirectory: {}", meta);
-        // let u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
-        // vec![Box::new(TripleInvoker::new(u))]
-        todo!()
+#[derive(Debug, Default)]
+pub struct MockDirectory {
+    // router_chain: RouterChain,
+}
+
+impl MockDirectory {
+    pub fn new() -> MockDirectory {
+        Self {
+            // router_chain: RouterChain::default(),
+        }
     }
+}
 
-    fn is_empty(&self) -> bool {
-        false
+impl Directory for MockDirectory {
+    fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
+        // tracing::info!("MockDirectory: {}", meta);
+        let u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+        vec![Box::new(TripleInvoker::new(u))]
+        // self.router_chain.route(u, invo);
     }
 }
+
+// #[derive(Debug, Default)]
+// pub struct RouterChain {
+//     router: HashMap<String, BoxRouter>,
+//     invokers: Arc<Vec<BoxInvoker>>,
+// }
+
+// impl RouterChain {
+//     pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) -> 
Arc<Vec<BoxInvoker>> {
+//         let r = self.router.get("mock").unwrap();
+//         r.route(self.invokers.clone(), url, invo)
+//     }
+// }
+
+// pub trait Router: Debug {
+//     fn route(
+//         &self,
+//         invokers: Arc<Vec<BoxInvoker>>,
+//         url: Url,
+//         invo: Arc<RpcInvocation>,
+//     ) -> Arc<Vec<BoxInvoker>>;
+// }
+
+// pub type BoxRouter = Box<dyn Router + Sync + Send>;
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs 
b/dubbo/src/cluster/support/cluster_invoker.rs
deleted file mode 100644
index 0ccca48..0000000
--- a/dubbo/src/cluster/support/cluster_invoker.rs
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use aws_smithy_http::body::SdkBody;
-use std::{str::FromStr, sync::Arc};
-
-use dubbo_base::Url;
-use http::{uri::PathAndQuery, Request};
-
-use crate::{
-    cluster::{
-        loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS},
-        support::DEFAULT_LOADBALANCE,
-    },
-    codegen::{Directory, RegistryDirectory, TripleClient},
-    invocation::RpcInvocation,
-};
-
-#[derive(Debug, Clone)]
-pub struct ClusterInvoker {
-    directory: Arc<RegistryDirectory>,
-    destroyed: bool,
-}
-
-pub trait ClusterInvokerSelector {
-    /// Select a invoker using loadbalance policy.
-    fn select(
-        &self,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-        excluded: Arc<Vec<Url>>,
-    ) -> Option<Url>;
-
-    fn do_select(
-        &self,
-        loadbalance_key: Option<&str>,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-    ) -> Option<Url>;
-}
-
-pub trait ClusterRequestBuilder {
-    fn build_req(
-        &self,
-        triple_client: &mut TripleClient,
-        path: http::uri::PathAndQuery,
-        invocation: Arc<RpcInvocation>,
-        body: SdkBody,
-    ) -> http::Request<SdkBody>;
-}
-
-impl ClusterInvoker {
-    pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
-        ClusterInvoker {
-            directory: Arc::new(registry_directory),
-            destroyed: false,
-        }
-    }
-
-    pub fn directory(&self) -> Arc<RegistryDirectory> {
-        self.directory.clone()
-    }
-
-    pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
-        if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
-            LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
-        } else {
-            println!(
-                "loadbalance {} not found, use default loadbalance {}",
-                loadbalance_key, DEFAULT_LOADBALANCE
-            );
-            LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
-        }
-    }
-
-    pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
-        !self.destroyed() && !self.directory.list(invocation).is_empty()
-    }
-
-    pub fn destroyed(&self) -> bool {
-        self.destroyed
-    }
-}
-
-impl ClusterInvokerSelector for ClusterInvoker {
-    fn select(
-        &self,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-        _excluded: Arc<Vec<Url>>,
-    ) -> Option<Url> {
-        if invokers.is_empty() {
-            return None;
-        }
-        let instance_count = invokers.len();
-        return if instance_count == 1 {
-            Some(invokers.as_ref().first()?.clone())
-        } else {
-            let loadbalance = Some(DEFAULT_LOADBALANCE);
-            self.do_select(loadbalance, invocation, invokers)
-        };
-    }
-
-    /// picking instance invoker url from registry directory
-    fn do_select(
-        &self,
-        loadbalance_key: Option<&str>,
-        invocation: Arc<RpcInvocation>,
-        invokers: Arc<Vec<Url>>,
-    ) -> Option<Url> {
-        let loadbalance = 
self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
-        loadbalance.select(invokers, None, invocation)
-    }
-}
-
-impl ClusterRequestBuilder for ClusterInvoker {
-    fn build_req(
-        &self,
-        triple_client: &mut TripleClient,
-        path: PathAndQuery,
-        invocation: Arc<RpcInvocation>,
-        body: SdkBody,
-    ) -> Request<SdkBody> {
-        let invokers = self.directory.list(invocation.clone());
-        let invoker_url = self
-            .select(invocation, Arc::new(invokers), Arc::new(Vec::new()))
-            .expect("no valid provider");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/";, invoker_url.ip, 
invoker_url.port))
-                .unwrap();
-        triple_client.map_request(http_uri, path, body)
-    }
-}
diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs
deleted file mode 100644
index ae42cc2..0000000
--- a/dubbo/src/cluster/support/mod.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-pub mod cluster_invoker;
-
-pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 98d784f..412feb9 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -27,14 +27,11 @@ pub use hyper::Body as hyperBody;
 pub use tower_service::Service;
 
 pub use super::{
-    cluster::{
-        directory::{Directory, RegistryDirectory},
-        support::cluster_invoker::ClusterInvoker,
-    },
+    cluster::directory::RegistryDirectory,
     empty_body,
     invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
     protocol::{triple::triple_invoker::TripleInvoker, Invoker},
-    registry::{BoxRegistry, Registry, RegistryWrapper},
+    registry::{BoxRegistry, Registry},
     triple::{
         client::TripleClient,
         codec::{prost::ProstCodec, Codec},
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 58dca5f..4dceb45 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -17,7 +17,6 @@
 
 use std::{
     fmt::Debug,
-    future::Future,
     task::{Context, Poll},
 };
 
@@ -43,18 +42,8 @@ pub trait Exporter {
     fn unexport(&self);
 }
 
-pub trait Invoker<ReqBody>: Debug {
-    type Response;
-
-    type Error;
-
-    type Future: Future<Output = Result<Self::Response, Self::Error>>;
-
+pub trait Invoker<ReqBody>: Debug + Service<ReqBody> {
     fn get_url(&self) -> Url;
-
-    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>>;
-
-    fn call(&mut self, req: ReqBody) -> Self::Future;
 }
 
 pub type BoxExporter = Box<dyn Exporter + Send + Sync>;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index 6139cc9..db18f5f 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -17,24 +17,32 @@
 
 use aws_smithy_http::body::SdkBody;
 use dubbo_base::Url;
-use std::fmt::{Debug, Formatter};
+use std::{
+    fmt::{Debug, Formatter},
+    str::FromStr,
+};
 use tower_service::Service;
 
-use crate::{protocol::Invoker, triple::client::builder::ClientBoxService};
+use crate::{
+    protocol::Invoker,
+    triple::{client::builder::ClientBoxService, 
transport::connection::Connection},
+    utils::boxed_clone::BoxCloneService,
+};
 
+#[derive(Clone)]
 pub struct TripleInvoker {
     url: Url,
     conn: ClientBoxService,
 }
 
 impl TripleInvoker {
-    // pub fn new(url: Url) -> TripleInvoker {
-    //     let uri = http::Uri::from_str(&url.to_url()).unwrap();
-    //     Self {
-    //         url,
-    //         conn: ClientBuilder::from_uri(&uri).build()connect(),
-    //     }
-    // }
+    pub fn new(url: Url) -> TripleInvoker {
+        let uri = http::Uri::from_str(&url.to_url()).unwrap();
+        Self {
+            url,
+            conn: BoxCloneService::new(Connection::new().with_host(uri)),
+        }
+    }
 }
 
 impl Debug for TripleInvoker {
@@ -43,17 +51,13 @@ impl Debug for TripleInvoker {
     }
 }
 
-impl Invoker<http::Request<SdkBody>> for TripleInvoker {
+impl Service<http::Request<SdkBody>> for TripleInvoker {
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
 
     type Future = crate::BoxFuture<Self::Response, Self::Error>;
 
-    fn get_url(&self) -> Url {
-        self.url.clone()
-    }
-
     fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
         self.conn.call(req)
     }
@@ -65,3 +69,9 @@ impl Invoker<http::Request<SdkBody>> for TripleInvoker {
         self.conn.poll_ready(cx)
     }
 }
+
+impl Invoker<http::Request<SdkBody>> for TripleInvoker {
+    fn get_url(&self) -> Url {
+        self.url.clone()
+    }
+}
diff --git a/dubbo/src/registry/integration.rs 
b/dubbo/src/registry/integration.rs
index 15b82d0..2944f98 100644
--- a/dubbo/src/registry/integration.rs
+++ b/dubbo/src/registry/integration.rs
@@ -14,10 +14,3 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-use crate::{cluster::support::cluster_invoker::ClusterInvoker, 
registry::BoxRegistry};
-use std::sync::Arc;
-
-pub trait ClusterRegistryIntegration {
-    /// get cluster invoker struct
-    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
-}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 31106f0..2a95452 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -60,20 +60,3 @@ impl Debug for BoxRegistry {
         f.write_str("BoxRegistry")
     }
 }
-
-#[derive(Default)]
-pub struct RegistryWrapper {
-    pub registry: Option<Box<dyn Registry>>,
-}
-
-impl Clone for RegistryWrapper {
-    fn clone(&self) -> Self {
-        Self { registry: None }
-    }
-}
-
-impl Debug for RegistryWrapper {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("RegistryWrapper").finish()
-    }
-}
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index cf667cc..d68067a 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,26 +15,28 @@
  * limitations under the License.
  */
 
+use std::sync::Arc;
+
 use crate::{
-    cluster::directory::StaticDirectory,
-    codegen::{ClusterInvoker, Directory, RegistryDirectory},
-    triple::compression::CompressionEncoding,
-    utils::boxed::BoxService,
+    cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, 
MockDirectory},
+    codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
+    protocol::BoxInvoker,
+    utils::boxed_clone::BoxCloneService,
 };
 
 use aws_smithy_http::body::SdkBody;
-
-use super::TripleClient;
+use dubbo_base::Url;
 
 pub type ClientBoxService =
-    BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
+    BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
 
 #[derive(Clone, Debug, Default)]
 pub struct ClientBuilder {
     pub timeout: Option<u64>,
     pub connector: &'static str,
-    directory: Option<Box<dyn Directory>>,
-    cluster_invoker: Option<ClusterInvoker>,
+    directory: Option<Arc<Box<dyn Directory>>>,
+    pub direct: bool,
+    host: String,
 }
 
 impl ClientBuilder {
@@ -43,7 +45,8 @@ impl ClientBuilder {
             timeout: None,
             connector: "",
             directory: None,
-            cluster_invoker: None,
+            direct: false,
+            host: "".to_string(),
         }
     }
 
@@ -51,17 +54,9 @@ impl ClientBuilder {
         Self {
             timeout: None,
             connector: "",
-            directory: Some(Box::new(StaticDirectory::new(&host))),
-            cluster_invoker: None,
-        }
-    }
-
-    pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
-        Self {
-            timeout: None,
-            connector: "",
-            directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
-            cluster_invoker: None,
+            directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))),
+            direct: true,
+            host: host.clone().to_string(),
         }
     }
 
@@ -75,23 +70,21 @@ impl ClientBuilder {
     /// host: http://0.0.0.0:8888
     pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
         Self {
-            directory: Some(directory),
-            cluster_invoker: None,
+            directory: Some(Arc::new(directory)),
             ..self
         }
     }
 
     pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
         Self {
-            directory: None,
-            cluster_invoker: Some(ClusterInvoker::with_directory(registry)),
+            directory: Some(Arc::new(Box::new(registry))),
             ..self
         }
     }
 
     pub fn with_host(self, host: &'static str) -> Self {
         Self {
-            directory: Some(Box::new(StaticDirectory::new(&host))),
+            directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))),
             ..self
         }
     }
@@ -99,16 +92,23 @@ impl ClientBuilder {
     pub fn with_connector(self, connector: &'static str) -> Self {
         Self {
             connector: connector,
-            cluster_invoker: None,
             ..self
         }
     }
 
-    pub fn build(self) -> TripleClient {
-        TripleClient {
-            send_compression_encoding: Some(CompressionEncoding::Gzip),
-            directory: self.directory,
-            cluster_invoker: self.cluster_invoker,
+    pub fn with_direct(self, direct: bool) -> Self {
+        Self { direct, ..self }
+    }
+
+    pub fn build(self, _invocation: Arc<RpcInvocation>) -> Option<BoxInvoker> {
+        if self.direct {
+            return Some(Box::new(TripleInvoker::new(
+                Url::from_url(&self.host).unwrap(),
+            )));
         }
+
+        let cluster = 
MockCluster::default().join(Box::new(MockDirectory::new()));
+
+        return Some(cluster);
     }
 }
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 56edb96..b81661c 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,46 +15,41 @@
  * limitations under the License.
  */
 
-use std::{str::FromStr, sync::Arc};
+use std::str::FromStr;
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
 
 use aws_smithy_http::body::SdkBody;
 use http::HeaderValue;
-use rand::prelude::SliceRandom;
-use tower_service::Service;
 
-use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
+use super::builder::ClientBuilder;
+use crate::codegen::RpcInvocation;
 
 use crate::{
-    cluster::support::cluster_invoker::ClusterRequestBuilder,
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
     triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, 
encode::encode},
 };
 
-#[derive(Debug, Clone, Default)]
+#[derive(Debug, Default, Clone)]
 pub struct TripleClient {
     pub(crate) send_compression_encoding: Option<CompressionEncoding>,
-    pub(crate) directory: Option<Box<dyn Directory>>,
-    pub(crate) cluster_invoker: Option<ClusterInvoker>,
+    pub(crate) builder: Option<ClientBuilder>,
 }
 
 impl TripleClient {
     pub fn connect(host: String) -> Self {
-        let builder = ClientBuilder::from_static(&host);
+        let builder = ClientBuilder::from_static(&host).with_direct(true);
 
-        builder.build()
+        TripleClient {
+            send_compression_encoding: Some(CompressionEncoding::Gzip),
+            builder: Some(builder),
+        }
     }
 
     pub fn new(builder: ClientBuilder) -> Self {
-        builder.build()
-    }
-
-    pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
         TripleClient {
-            cluster_invoker: Some(invoker),
-            ..self
+            send_compression_encoding: Some(CompressionEncoding::Gzip),
+            builder: Some(builder),
         }
     }
 
@@ -150,27 +145,19 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
-        let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
+        let bytes = hyper::body::to_bytes(body).await.unwrap();
+        let sdk_body = SdkBody::from(bytes);
+
+        let mut conn = self
+            .builder
+            .clone()
+            .unwrap()
+            .build(invocation.into())
+            .unwrap();
+
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
 
-        let mut conn = Connection::new().with_host(http_uri);
         let response = conn
             .call(req)
             .await
@@ -225,25 +212,17 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
-        let mut conn = Connection::new().with_host(http_uri);
+
+        let mut conn = self
+            .builder
+            .clone()
+            .unwrap()
+            .build(invocation.into())
+            .unwrap();
+
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
+
         let response = conn
             .call(req)
             .await
@@ -282,25 +261,18 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
-        let mut conn = Connection::new().with_host(http_uri);
+
+        let mut conn = self
+            .builder
+            .clone()
+            .unwrap()
+            .build(invocation.into())
+            .unwrap();
+
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
+
+        // let mut conn = Connection::new().with_host(http_uri);
         let response = conn
             .call(req)
             .await
@@ -355,26 +327,17 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-        let arc_invocation = Arc::new(invocation);
-        let req;
-        let http_uri;
-        if self.cluster_invoker.is_some() {
-            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
-            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
-            http_uri = req.uri().clone();
-        } else {
-            let url_list = self
-                .directory
-                .as_ref()
-                .expect("msg")
-                .list(arc_invocation.clone());
-            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
-            http_uri =
-                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-            req = self.map_request(http_uri.clone(), path, sdk_body);
-        }
 
-        let mut conn = Connection::new().with_host(http_uri);
+        let mut conn = self
+            .builder
+            .clone()
+            .unwrap()
+            .build(invocation.into())
+            .unwrap();
+
+        let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
+
         let response = conn
             .call(req)
             .await
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 319c17b..bc6638b 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -30,9 +30,7 @@ async-trait = "0.1.56"
 tokio-stream = "0.1"
 dubbo-logger.workspace=true
 
-hyper = { version = "0.14.19", features = ["full"]}
-
-dubbo = {path = "../../dubbo", version = "0.3.0" }
+dubbo = {path = "../../dubbo"}
 dubbo-config = {path = "../../config", version = "0.3.0" }
 registry-zookeeper.workspace=true
 
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index db46958..0a2f150 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,7 +34,9 @@ async fn main() {
     // let builder = ClientBuilder::new()
     //     .with_connector("unix")
     //     .with_host("unix://127.0.0.1:8888");
-    let builder = 
ClientBuilder::from_static(&"http://127.0.0.1:8888";).with_timeout(1000000);
+    let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888";)
+        .with_timeout(1000000)
+        .with_direct(true);
     let mut cli = EchoClient::new(builder);
     // let mut unary_cli = cli.clone().with_filter(FakeFilter {});
     // let mut cli = 
EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888";));
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index 6ad3b1b..d68cab7 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -29,7 +29,7 @@ prost = "0.10.4"
 async-trait = "0.1.56"
 tokio-stream = "0.1"
 dubbo-logger = { path = "../../common/logger" }
-dubbo = { path = "../../dubbo", version = "0.3.0" }
+dubbo = { path = "../../dubbo"}
 dubbo-config = { path = "../../config", version = "0.3.0" }
 registry-zookeeper.workspace = true
 registry-nacos.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 5debc0a..e8c2c5c 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -34,11 +34,9 @@ use serde::{Deserialize, Serialize};
 use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, 
ZooKeeper};
 
 use dubbo::{
-    cluster::support::cluster_invoker::ClusterInvoker,
-    codegen::BoxRegistry,
     registry::{
-        integration::ClusterRegistryIntegration, 
memory_registry::MemoryRegistry, NotifyListener,
-        Registry, RegistryNotifyListener, ServiceEvent,
+        memory_registry::MemoryRegistry, NotifyListener, Registry, 
RegistryNotifyListener,
+        ServiceEvent,
     },
     StdError,
 };
@@ -371,12 +369,6 @@ impl NotifyListener for ServiceInstancesChangedListener {
     }
 }
 
-impl ClusterRegistryIntegration for ZookeeperRegistry {
-    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
-        todo!()
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;


Reply via email to