This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch feat/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/feat/cluster by this push:
new b2e829c Feat(cluster): Cluster Policy Impl (#146)
b2e829c is described below
commit b2e829cc397679a28a729ca7de7b59c391f00641
Author: Yang Yang <[email protected]>
AuthorDate: Tue Aug 1 10:40:30 2023 +0800
Feat(cluster): Cluster Policy Impl (#146)
* refactor(cluster): comment the logic of clone body
* Rft(triple): remove Clone of Invoker
* Rft(cluster): use ready_cache to manage Invokers, add ready_cache in
FailoverCluster
* Rft(protocol): use interface Inheritance to redesign Invoker
---------
Co-authored-by: G-XD <[email protected]>
Co-authored-by: GXD <[email protected]>
---
config/src/protocol.rs | 20 +++-
dubbo-build/src/client.rs | 5 -
dubbo/Cargo.toml | 7 +-
dubbo/src/cluster/directory.rs | 69 ++++--------
dubbo/src/cluster/mod.rs | 133 ++++++++++++++++-------
dubbo/src/cluster/support/cluster_invoker.rs | 147 --------------------------
dubbo/src/cluster/support/mod.rs | 20 ----
dubbo/src/codegen.rs | 7 +-
dubbo/src/protocol/mod.rs | 13 +--
dubbo/src/protocol/triple/triple_invoker.rs | 38 ++++---
dubbo/src/registry/integration.rs | 7 --
dubbo/src/registry/mod.rs | 17 ---
dubbo/src/triple/client/builder.rs | 64 ++++++------
dubbo/src/triple/client/triple.rs | 151 ++++++++++-----------------
examples/echo/Cargo.toml | 4 +-
examples/echo/src/echo/client.rs | 4 +-
examples/greeter/Cargo.toml | 2 +-
registry/zookeeper/src/lib.rs | 12 +--
18 files changed, 258 insertions(+), 462 deletions(-)
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 86ff053..e2340c1 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -77,10 +77,26 @@ impl ProtocolRetrieve for ProtocolConfig {
} else {
let result = self.get_protocol(protocol_key);
if let Some(..) = result {
- panic!("default triple base dose not defined.")
- } else {
result.unwrap()
+ } else {
+ panic!("default triple base dose not defined.")
}
}
}
}
+
+#[cfg(test)]
+mod tests {
+
+ use super::{ProtocolConfig, ProtocolRetrieve};
+
+ #[test]
+ #[should_panic(expected = "default triple base dose not defined")]
+ pub fn test_get_invalid_protocol() {
+ let config = ProtocolConfig::default();
+
+ let _ = config.get_protocol_or_default("");
+
+ ()
+ }
+}
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 1e1c9fb..bfcfe45 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -90,11 +90,6 @@ pub fn generate<T: Service>(
}
}
- pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self
{
- self.inner = self.inner.with_cluster(invoker);
- self
- }
-
#methods
}
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 94a2bd8..51ccc19 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -10,11 +10,11 @@ repository = "https://github.com/apache/dubbo-rust.git"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-hyper = { version = "0.14.19", features = ["full"] }
+hyper = { version = "0.14.26", features = ["full"] }
http = "0.2"
tower-service.workspace = true
http-body = "0.4.4"
-tower = { workspace = true, features = ["timeout"] }
+tower = { workspace = true, features = ["timeout", "ready-cache"] }
futures-util = "0.3.23"
futures-core ="0.3.23"
argh = "0.1"
@@ -33,7 +33,8 @@ futures.workspace = true
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
-aws-smithy-http = "0.54.1"
+aws-smithy-http = "0.55.2"
+dyn-clone = "1.0.11"
itertools.workspace = true
urlencoding.workspace = true
lazy_static.workspace = true
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index e74fc6d..afe9657 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -23,39 +23,21 @@ use std::{
};
use crate::{
+ codegen::TripleInvoker,
invocation::{Invocation, RpcInvocation},
- registry::{memory_registry::MemoryNotifyListener, BoxRegistry,
RegistryWrapper},
+ protocol::BoxInvoker,
+ registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
};
use dubbo_base::Url;
use dubbo_logger::tracing;
+use crate::cluster::Directory;
+
/// Directory.
///
/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
-pub trait Directory: Debug + DirectoryClone {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
-}
-pub trait DirectoryClone {
- fn clone_box(&self) -> Box<dyn Directory>;
-}
-
-impl<T> DirectoryClone for T
-where
- T: 'static + Directory + Clone,
-{
- fn clone_box(&self) -> Box<dyn Directory> {
- Box::new(self.clone())
- }
-}
-
-impl Clone for Box<dyn Directory> {
- fn clone(&self) -> Box<dyn Directory> {
- self.clone_box()
- }
-}
-
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct StaticDirectory {
uri: http::Uri,
}
@@ -78,7 +60,7 @@ impl StaticDirectory {
}
impl Directory for StaticDirectory {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let url = Url::from_url(&format!(
"tri://{}:{}/{}",
self.uri.host().unwrap(),
@@ -86,43 +68,28 @@ impl Directory for StaticDirectory {
invocation.get_target_service_unique_name(),
))
.unwrap();
- vec![url]
- }
-}
-
-impl DirectoryClone for StaticDirectory {
- fn clone_box(&self) -> Box<dyn Directory> {
- Box::new(StaticDirectory {
- uri: self.uri.clone(),
- })
+ let invoker = Box::new(TripleInvoker::new(url));
+ vec![invoker]
}
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct RegistryDirectory {
- registry: RegistryWrapper,
+ registry: Arc<BoxRegistry>,
service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
}
impl RegistryDirectory {
pub fn new(registry: BoxRegistry) -> RegistryDirectory {
RegistryDirectory {
- registry: RegistryWrapper {
- registry: Some(registry),
- },
+ registry: Arc::new(registry),
service_instances: Arc::new(RwLock::new(HashMap::new())),
}
}
}
-impl DirectoryClone for RegistryDirectory {
- fn clone_box(&self) -> Box<dyn Directory> {
- todo!()
- }
-}
-
impl Directory for RegistryDirectory {
- fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let service_name = invocation.get_target_service_unique_name();
let url = Url::from_url(&format!(
@@ -132,9 +99,6 @@ impl Directory for RegistryDirectory {
.unwrap();
self.registry
- .registry
- .as_ref()
- .expect("msg")
.subscribe(
url,
Arc::new(MemoryNotifyListener {
@@ -149,6 +113,11 @@ impl Directory for RegistryDirectory {
.expect("service_instances.read");
let binding = Vec::new();
let url_vec = map.get(&service_name).unwrap_or(&binding);
- url_vec.to_vec()
+ // url_vec.to_vec()
+ let mut invokers: Vec<BoxInvoker> = vec![];
+ for item in url_vec.iter() {
+ invokers.push(Box::new(TripleInvoker::new(item.clone())));
+ }
+ invokers
}
}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index b6d8a7c..afc3cc9 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,31 +15,54 @@
* limitations under the License.
*/
-use std::{sync::Arc, task::Poll};
+use std::{fmt::Debug, sync::Arc, task::Poll};
use aws_smithy_http::body::SdkBody;
+use dubbo_base::Url;
+use futures_util::TryFutureExt;
+use tower::ready_cache::ReadyCache;
use tower_service::Service;
-use crate::{empty_body, protocol::BoxInvoker};
+use crate::{
+ invocation::RpcInvocation,
+ protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker},
+};
pub mod directory;
pub mod loadbalance;
-pub mod support;
-pub trait Directory {
- fn list(&self, meta: String) -> Vec<BoxInvoker>;
- fn is_empty(&self) -> bool;
+pub trait Directory: Debug {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
}
-type BoxDirectory = Box<dyn Directory>;
+type BoxDirectory = Box<dyn Directory + Send + Sync>;
+pub trait Cluster {
+ fn join(&self, dir: BoxDirectory) -> BoxInvoker;
+}
+
+#[derive(Debug, Default)]
+pub struct MockCluster {}
+
+impl Cluster for MockCluster {
+ fn join(&self, dir: BoxDirectory) -> BoxInvoker {
+ Box::new(FailoverCluster::new(dir))
+ }
+}
+
+// 在Cluster上进行缓存Service
+#[derive(Debug)]
pub struct FailoverCluster {
dir: Arc<BoxDirectory>,
+ caches: ReadyCache<usize, BoxInvoker, http::Request<SdkBody>>,
}
impl FailoverCluster {
pub fn new(dir: BoxDirectory) -> FailoverCluster {
- Self { dir: Arc::new(dir) }
+ Self {
+ dir: Arc::new(dir),
+ caches: ReadyCache::default(),
+ }
}
}
@@ -59,43 +82,75 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
}
fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
- println!("req: {}", req.body().content_length().unwrap());
- let clone_body = req.body().try_clone().unwrap();
- let mut clone_req = http::Request::builder()
- .uri(req.uri().clone())
- .method(req.method().clone());
- *clone_req.headers_mut().unwrap() = req.headers().clone();
- let r = clone_req.body(clone_body).unwrap();
- let invokers = self.dir.list("service_name".to_string());
- for mut invoker in invokers {
- let fut = async move {
- let res = invoker.call(r).await;
- return res;
- };
- return Box::pin(fut);
+ // let clone_body = req.body().try_clone().unwrap();
+ // let mut clone_req = http::Request::builder()
+ // .uri(req.uri().clone())
+ // .method(req.method().clone());
+ // *clone_req.headers_mut().unwrap() = req.headers().clone();
+ // let r = clone_req.body(clone_body).unwrap();
+ let invokers = self.dir.list(
+ RpcInvocation::default()
+ .with_service_unique_name("hello".to_string())
+ .into(),
+ );
+ let mut i: usize = 0;
+ for invoker in invokers {
+ self.caches.push(i, invoker);
+ i += 1;
}
- Box::pin(async move {
- Ok(http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap())
- })
+
+ Box::pin(self.caches.call_ready_index(0, req).map_err(Into::into))
}
}
-pub struct MockDirectory {}
+impl Invoker<http::Request<SdkBody>> for FailoverCluster {
+ fn get_url(&self) -> dubbo_base::Url {
+ Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
+ }
+}
-impl Directory for MockDirectory {
- fn list(&self, _meta: String) -> Vec<BoxInvoker> {
- // tracing::info!("MockDirectory: {}", meta);
- // let u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
- // vec![Box::new(TripleInvoker::new(u))]
- todo!()
+#[derive(Debug, Default)]
+pub struct MockDirectory {
+ // router_chain: RouterChain,
+}
+
+impl MockDirectory {
+ pub fn new() -> MockDirectory {
+ Self {
+ // router_chain: RouterChain::default(),
+ }
}
+}
- fn is_empty(&self) -> bool {
- false
+impl Directory for MockDirectory {
+ fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
+ // tracing::info!("MockDirectory: {}", meta);
+ let u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+ vec![Box::new(TripleInvoker::new(u))]
+ // self.router_chain.route(u, invo);
}
}
+
+// #[derive(Debug, Default)]
+// pub struct RouterChain {
+// router: HashMap<String, BoxRouter>,
+// invokers: Arc<Vec<BoxInvoker>>,
+// }
+
+// impl RouterChain {
+// pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) ->
Arc<Vec<BoxInvoker>> {
+// let r = self.router.get("mock").unwrap();
+// r.route(self.invokers.clone(), url, invo)
+// }
+// }
+
+// pub trait Router: Debug {
+// fn route(
+// &self,
+// invokers: Arc<Vec<BoxInvoker>>,
+// url: Url,
+// invo: Arc<RpcInvocation>,
+// ) -> Arc<Vec<BoxInvoker>>;
+// }
+
+// pub type BoxRouter = Box<dyn Router + Sync + Send>;
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs
b/dubbo/src/cluster/support/cluster_invoker.rs
deleted file mode 100644
index 0ccca48..0000000
--- a/dubbo/src/cluster/support/cluster_invoker.rs
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use aws_smithy_http::body::SdkBody;
-use std::{str::FromStr, sync::Arc};
-
-use dubbo_base::Url;
-use http::{uri::PathAndQuery, Request};
-
-use crate::{
- cluster::{
- loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS},
- support::DEFAULT_LOADBALANCE,
- },
- codegen::{Directory, RegistryDirectory, TripleClient},
- invocation::RpcInvocation,
-};
-
-#[derive(Debug, Clone)]
-pub struct ClusterInvoker {
- directory: Arc<RegistryDirectory>,
- destroyed: bool,
-}
-
-pub trait ClusterInvokerSelector {
- /// Select a invoker using loadbalance policy.
- fn select(
- &self,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- excluded: Arc<Vec<Url>>,
- ) -> Option<Url>;
-
- fn do_select(
- &self,
- loadbalance_key: Option<&str>,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- ) -> Option<Url>;
-}
-
-pub trait ClusterRequestBuilder {
- fn build_req(
- &self,
- triple_client: &mut TripleClient,
- path: http::uri::PathAndQuery,
- invocation: Arc<RpcInvocation>,
- body: SdkBody,
- ) -> http::Request<SdkBody>;
-}
-
-impl ClusterInvoker {
- pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
- ClusterInvoker {
- directory: Arc::new(registry_directory),
- destroyed: false,
- }
- }
-
- pub fn directory(&self) -> Arc<RegistryDirectory> {
- self.directory.clone()
- }
-
- pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
- if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
- LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
- } else {
- println!(
- "loadbalance {} not found, use default loadbalance {}",
- loadbalance_key, DEFAULT_LOADBALANCE
- );
- LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
- }
- }
-
- pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
- !self.destroyed() && !self.directory.list(invocation).is_empty()
- }
-
- pub fn destroyed(&self) -> bool {
- self.destroyed
- }
-}
-
-impl ClusterInvokerSelector for ClusterInvoker {
- fn select(
- &self,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- _excluded: Arc<Vec<Url>>,
- ) -> Option<Url> {
- if invokers.is_empty() {
- return None;
- }
- let instance_count = invokers.len();
- return if instance_count == 1 {
- Some(invokers.as_ref().first()?.clone())
- } else {
- let loadbalance = Some(DEFAULT_LOADBALANCE);
- self.do_select(loadbalance, invocation, invokers)
- };
- }
-
- /// picking instance invoker url from registry directory
- fn do_select(
- &self,
- loadbalance_key: Option<&str>,
- invocation: Arc<RpcInvocation>,
- invokers: Arc<Vec<Url>>,
- ) -> Option<Url> {
- let loadbalance =
self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
- loadbalance.select(invokers, None, invocation)
- }
-}
-
-impl ClusterRequestBuilder for ClusterInvoker {
- fn build_req(
- &self,
- triple_client: &mut TripleClient,
- path: PathAndQuery,
- invocation: Arc<RpcInvocation>,
- body: SdkBody,
- ) -> Request<SdkBody> {
- let invokers = self.directory.list(invocation.clone());
- let invoker_url = self
- .select(invocation, Arc::new(invokers), Arc::new(Vec::new()))
- .expect("no valid provider");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip,
invoker_url.port))
- .unwrap();
- triple_client.map_request(http_uri, path, body)
- }
-}
diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs
deleted file mode 100644
index ae42cc2..0000000
--- a/dubbo/src/cluster/support/mod.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-pub mod cluster_invoker;
-
-pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 98d784f..412feb9 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -27,14 +27,11 @@ pub use hyper::Body as hyperBody;
pub use tower_service::Service;
pub use super::{
- cluster::{
- directory::{Directory, RegistryDirectory},
- support::cluster_invoker::ClusterInvoker,
- },
+ cluster::directory::RegistryDirectory,
empty_body,
invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
protocol::{triple::triple_invoker::TripleInvoker, Invoker},
- registry::{BoxRegistry, Registry, RegistryWrapper},
+ registry::{BoxRegistry, Registry},
triple::{
client::TripleClient,
codec::{prost::ProstCodec, Codec},
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 58dca5f..4dceb45 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -17,7 +17,6 @@
use std::{
fmt::Debug,
- future::Future,
task::{Context, Poll},
};
@@ -43,18 +42,8 @@ pub trait Exporter {
fn unexport(&self);
}
-pub trait Invoker<ReqBody>: Debug {
- type Response;
-
- type Error;
-
- type Future: Future<Output = Result<Self::Response, Self::Error>>;
-
+pub trait Invoker<ReqBody>: Debug + Service<ReqBody> {
fn get_url(&self) -> Url;
-
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),
Self::Error>>;
-
- fn call(&mut self, req: ReqBody) -> Self::Future;
}
pub type BoxExporter = Box<dyn Exporter + Send + Sync>;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs
b/dubbo/src/protocol/triple/triple_invoker.rs
index 6139cc9..db18f5f 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -17,24 +17,32 @@
use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
-use std::fmt::{Debug, Formatter};
+use std::{
+ fmt::{Debug, Formatter},
+ str::FromStr,
+};
use tower_service::Service;
-use crate::{protocol::Invoker, triple::client::builder::ClientBoxService};
+use crate::{
+ protocol::Invoker,
+ triple::{client::builder::ClientBoxService,
transport::connection::Connection},
+ utils::boxed_clone::BoxCloneService,
+};
+#[derive(Clone)]
pub struct TripleInvoker {
url: Url,
conn: ClientBoxService,
}
impl TripleInvoker {
- // pub fn new(url: Url) -> TripleInvoker {
- // let uri = http::Uri::from_str(&url.to_url()).unwrap();
- // Self {
- // url,
- // conn: ClientBuilder::from_uri(&uri).build()connect(),
- // }
- // }
+ pub fn new(url: Url) -> TripleInvoker {
+ let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ Self {
+ url,
+ conn: BoxCloneService::new(Connection::new().with_host(uri)),
+ }
+ }
}
impl Debug for TripleInvoker {
@@ -43,17 +51,13 @@ impl Debug for TripleInvoker {
}
}
-impl Invoker<http::Request<SdkBody>> for TripleInvoker {
+impl Service<http::Request<SdkBody>> for TripleInvoker {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
type Future = crate::BoxFuture<Self::Response, Self::Error>;
- fn get_url(&self) -> Url {
- self.url.clone()
- }
-
fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
self.conn.call(req)
}
@@ -65,3 +69,9 @@ impl Invoker<http::Request<SdkBody>> for TripleInvoker {
self.conn.poll_ready(cx)
}
}
+
+impl Invoker<http::Request<SdkBody>> for TripleInvoker {
+ fn get_url(&self) -> Url {
+ self.url.clone()
+ }
+}
diff --git a/dubbo/src/registry/integration.rs
b/dubbo/src/registry/integration.rs
index 15b82d0..2944f98 100644
--- a/dubbo/src/registry/integration.rs
+++ b/dubbo/src/registry/integration.rs
@@ -14,10 +14,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use crate::{cluster::support::cluster_invoker::ClusterInvoker,
registry::BoxRegistry};
-use std::sync::Arc;
-
-pub trait ClusterRegistryIntegration {
- /// get cluster invoker struct
- fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
-}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 31106f0..2a95452 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -60,20 +60,3 @@ impl Debug for BoxRegistry {
f.write_str("BoxRegistry")
}
}
-
-#[derive(Default)]
-pub struct RegistryWrapper {
- pub registry: Option<Box<dyn Registry>>,
-}
-
-impl Clone for RegistryWrapper {
- fn clone(&self) -> Self {
- Self { registry: None }
- }
-}
-
-impl Debug for RegistryWrapper {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("RegistryWrapper").finish()
- }
-}
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index cf667cc..d68067a 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,26 +15,28 @@
* limitations under the License.
*/
+use std::sync::Arc;
+
use crate::{
- cluster::directory::StaticDirectory,
- codegen::{ClusterInvoker, Directory, RegistryDirectory},
- triple::compression::CompressionEncoding,
- utils::boxed::BoxService,
+ cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster,
MockDirectory},
+ codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
+ protocol::BoxInvoker,
+ utils::boxed_clone::BoxCloneService,
};
use aws_smithy_http::body::SdkBody;
-
-use super::TripleClient;
+use dubbo_base::Url;
pub type ClientBoxService =
- BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>,
crate::Error>;
+ BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>,
crate::Error>;
#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
pub timeout: Option<u64>,
pub connector: &'static str,
- directory: Option<Box<dyn Directory>>,
- cluster_invoker: Option<ClusterInvoker>,
+ directory: Option<Arc<Box<dyn Directory>>>,
+ pub direct: bool,
+ host: String,
}
impl ClientBuilder {
@@ -43,7 +45,8 @@ impl ClientBuilder {
timeout: None,
connector: "",
directory: None,
- cluster_invoker: None,
+ direct: false,
+ host: "".to_string(),
}
}
@@ -51,17 +54,9 @@ impl ClientBuilder {
Self {
timeout: None,
connector: "",
- directory: Some(Box::new(StaticDirectory::new(&host))),
- cluster_invoker: None,
- }
- }
-
- pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
- Self {
- timeout: None,
- connector: "",
- directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
- cluster_invoker: None,
+ directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))),
+ direct: true,
+ host: host.clone().to_string(),
}
}
@@ -75,23 +70,21 @@ impl ClientBuilder {
/// host: http://0.0.0.0:8888
pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
Self {
- directory: Some(directory),
- cluster_invoker: None,
+ directory: Some(Arc::new(directory)),
..self
}
}
pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
Self {
- directory: None,
- cluster_invoker: Some(ClusterInvoker::with_directory(registry)),
+ directory: Some(Arc::new(Box::new(registry))),
..self
}
}
pub fn with_host(self, host: &'static str) -> Self {
Self {
- directory: Some(Box::new(StaticDirectory::new(&host))),
+ directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))),
..self
}
}
@@ -99,16 +92,23 @@ impl ClientBuilder {
pub fn with_connector(self, connector: &'static str) -> Self {
Self {
connector: connector,
- cluster_invoker: None,
..self
}
}
- pub fn build(self) -> TripleClient {
- TripleClient {
- send_compression_encoding: Some(CompressionEncoding::Gzip),
- directory: self.directory,
- cluster_invoker: self.cluster_invoker,
+ pub fn with_direct(self, direct: bool) -> Self {
+ Self { direct, ..self }
+ }
+
+ pub fn build(self, _invocation: Arc<RpcInvocation>) -> Option<BoxInvoker> {
+ if self.direct {
+ return Some(Box::new(TripleInvoker::new(
+ Url::from_url(&self.host).unwrap(),
+ )));
}
+
+ let cluster =
MockCluster::default().join(Box::new(MockDirectory::new()));
+
+ return Some(cluster);
}
}
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index 56edb96..b81661c 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,46 +15,41 @@
* limitations under the License.
*/
-use std::{str::FromStr, sync::Arc};
+use std::str::FromStr;
use futures_util::{future, stream, StreamExt, TryStreamExt};
use aws_smithy_http::body::SdkBody;
use http::HeaderValue;
-use rand::prelude::SliceRandom;
-use tower_service::Service;
-use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
+use super::builder::ClientBuilder;
+use crate::codegen::RpcInvocation;
use crate::{
- cluster::support::cluster_invoker::ClusterRequestBuilder,
invocation::{IntoStreamingRequest, Metadata, Request, Response},
triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding,
encode::encode},
};
-#[derive(Debug, Clone, Default)]
+#[derive(Debug, Default, Clone)]
pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
- pub(crate) directory: Option<Box<dyn Directory>>,
- pub(crate) cluster_invoker: Option<ClusterInvoker>,
+ pub(crate) builder: Option<ClientBuilder>,
}
impl TripleClient {
pub fn connect(host: String) -> Self {
- let builder = ClientBuilder::from_static(&host);
+ let builder = ClientBuilder::from_static(&host).with_direct(true);
- builder.build()
+ TripleClient {
+ send_compression_encoding: Some(CompressionEncoding::Gzip),
+ builder: Some(builder),
+ }
}
pub fn new(builder: ClientBuilder) -> Self {
- builder.build()
- }
-
- pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
TripleClient {
- cluster_invoker: Some(invoker),
- ..self
+ send_compression_encoding: Some(CompressionEncoding::Gzip),
+ builder: Some(builder),
}
}
@@ -150,27 +145,19 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
+ let bytes = hyper::body::to_bytes(body).await.unwrap();
+ let sdk_body = SdkBody::from(bytes);
+
+ let mut conn = self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap();
+
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
- let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
.await
@@ -225,25 +212,17 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
- let mut conn = Connection::new().with_host(http_uri);
+
+ let mut conn = self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap();
+
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
+
let response = conn
.call(req)
.await
@@ -282,25 +261,18 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
- let mut conn = Connection::new().with_host(http_uri);
+
+ let mut conn = self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap();
+
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
+
+ // let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
.await
@@ -355,26 +327,17 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
- let mut conn = Connection::new().with_host(http_uri);
+ let mut conn = self
+ .builder
+ .clone()
+ .unwrap()
+ .build(invocation.into())
+ .unwrap();
+
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
+
let response = conn
.call(req)
.await
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 319c17b..bc6638b 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -30,9 +30,7 @@ async-trait = "0.1.56"
tokio-stream = "0.1"
dubbo-logger.workspace=true
-hyper = { version = "0.14.19", features = ["full"]}
-
-dubbo = {path = "../../dubbo", version = "0.3.0" }
+dubbo = {path = "../../dubbo"}
dubbo-config = {path = "../../config", version = "0.3.0" }
registry-zookeeper.workspace=true
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index db46958..0a2f150 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -34,7 +34,9 @@ async fn main() {
// let builder = ClientBuilder::new()
// .with_connector("unix")
// .with_host("unix://127.0.0.1:8888");
- let builder =
ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000);
+ let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888")
+ .with_timeout(1000000)
+ .with_direct(true);
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli =
EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index 6ad3b1b..d68cab7 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -29,7 +29,7 @@ prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
dubbo-logger = { path = "../../common/logger" }
-dubbo = { path = "../../dubbo", version = "0.3.0" }
+dubbo = { path = "../../dubbo"}
dubbo-config = { path = "../../config", version = "0.3.0" }
registry-zookeeper.workspace = true
registry-nacos.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 5debc0a..e8c2c5c 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -34,11 +34,9 @@ use serde::{Deserialize, Serialize};
use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher,
ZooKeeper};
use dubbo::{
- cluster::support::cluster_invoker::ClusterInvoker,
- codegen::BoxRegistry,
registry::{
- integration::ClusterRegistryIntegration,
memory_registry::MemoryRegistry, NotifyListener,
- Registry, RegistryNotifyListener, ServiceEvent,
+ memory_registry::MemoryRegistry, NotifyListener, Registry,
RegistryNotifyListener,
+ ServiceEvent,
},
StdError,
};
@@ -371,12 +369,6 @@ impl NotifyListener for ServiceInstancesChangedListener {
}
}
-impl ClusterRegistryIntegration for ZookeeperRegistry {
- fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
- todo!()
- }
-}
-
#[cfg(test)]
mod tests {
use std::sync::Arc;