This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 54181bf Feat/cluster (#138)
54181bf is described below
commit 54181bf417536e75ed6a4ff49efaf7a5911fd5d3
Author: Yang Yang <[email protected]>
AuthorDate: Tue May 16 21:29:31 2023 +0800
Feat/cluster (#138)
* refactor(cluster): add Cluster MockImpl
* refactor(triple): use ClientBuilder to init Cluster ability
* Update builder.rs
update default direct value
* Update triple.rs
handle unused var
* Update mod.rs
comment some codes
* refactor(triple): rm unused var in clientBuilder
---
dubbo/Cargo.toml | 1 +
dubbo/src/cluster/mod.rs | 101 ++++++++++++++++++++++++----
dubbo/src/protocol/mod.rs | 12 +++-
dubbo/src/protocol/triple/triple_invoker.rs | 26 ++++---
dubbo/src/triple/client/builder.rs | 44 ++++++++----
dubbo/src/triple/client/triple.rs | 33 +++------
6 files changed, 159 insertions(+), 58 deletions(-)
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 94a2bd8..91b19d6 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -34,6 +34,7 @@ axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
aws-smithy-http = "0.54.1"
+dyn-clone = "1.0.11"
itertools.workspace = true
urlencoding.workspace = true
lazy_static.workspace = true
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index b6d8a7c..4f73d2f 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,24 +15,41 @@
* limitations under the License.
*/
-use std::{sync::Arc, task::Poll};
+use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll};
use aws_smithy_http::body::SdkBody;
-use tower_service::Service;
+use dubbo_base::Url;
-use crate::{empty_body, protocol::BoxInvoker};
+use crate::{
+ empty_body,
+ invocation::RpcInvocation,
+ protocol::{BoxInvoker, Invoker},
+};
pub mod directory;
pub mod loadbalance;
pub mod support;
-pub trait Directory {
- fn list(&self, meta: String) -> Vec<BoxInvoker>;
+pub trait Directory: Debug {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
fn is_empty(&self) -> bool;
}
-type BoxDirectory = Box<dyn Directory>;
+type BoxDirectory = Box<dyn Directory + Send + Sync>;
+pub trait Cluster {
+ fn join(&self, dir: BoxDirectory) -> BoxInvoker;
+}
+
+#[derive(Debug, Default)]
+pub struct MockCluster {}
+
+impl Cluster for MockCluster {
+ fn join(&self, dir: BoxDirectory) -> BoxInvoker {
+ Box::new(FailoverCluster::new(dir))
+ }
+}
+#[derive(Clone, Debug)]
pub struct FailoverCluster {
dir: Arc<BoxDirectory>,
}
@@ -43,7 +60,7 @@ impl FailoverCluster {
}
}
-impl Service<http::Request<SdkBody>> for FailoverCluster {
+impl Invoker<http::Request<SdkBody>> for FailoverCluster {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
@@ -66,7 +83,11 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
.method(req.method().clone());
*clone_req.headers_mut().unwrap() = req.headers().clone();
let r = clone_req.body(clone_body).unwrap();
- let invokers = self.dir.list("service_name".to_string());
+ let invokers = self.dir.list(
+ RpcInvocation::default()
+ .with_service_unique_name("hello".to_string())
+ .into(),
+ );
for mut invoker in invokers {
let fut = async move {
let res = invoker.call(r).await;
@@ -83,19 +104,75 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
.unwrap())
})
}
+
+ fn get_url(&self) -> dubbo_base::Url {
+ Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
+ }
+}
+
+#[derive(Debug, Default)]
+pub struct MockDirectory {
+ // router_chain: RouterChain,
+ invokers: Vec<BoxInvoker>,
}
-pub struct MockDirectory {}
+impl MockDirectory {
+ pub fn new(invokers: Vec<BoxInvoker>) -> MockDirectory {
+ Self {
+ // router_chain: RouterChain::default(),
+ invokers,
+ }
+ }
+}
impl Directory for MockDirectory {
- fn list(&self, _meta: String) -> Vec<BoxInvoker> {
+ fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
// tracing::info!("MockDirectory: {}", meta);
- // let u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+ let _u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
// vec![Box::new(TripleInvoker::new(u))]
- todo!()
+ // self.router_chain.route(u, invo);
+ self.invokers.clone()
}
fn is_empty(&self) -> bool {
false
}
}
+
+#[derive(Debug, Default)]
+pub struct RouterChain {
+ router: HashMap<String, BoxRouter>,
+ invokers: Vec<BoxInvoker>,
+}
+
+impl RouterChain {
+ pub fn route(&self, url: Url, invo: Arc<RpcInvocation>) -> Vec<BoxInvoker>
{
+ let r = self.router.get("mock").unwrap();
+ r.route(self.invokers.clone(), url, invo)
+ }
+}
+
+pub trait Router: Debug {
+ fn route(
+ &self,
+ invokers: Vec<BoxInvoker>,
+ url: Url,
+ invo: Arc<RpcInvocation>,
+ ) -> Vec<BoxInvoker>;
+}
+
+pub type BoxRouter = Box<dyn Router + Sync + Send>;
+
+#[derive(Debug, Default)]
+pub struct MockRouter {}
+
+impl Router for MockRouter {
+ fn route(
+ &self,
+ invokers: Vec<BoxInvoker>,
+ _url: Url,
+ _invo: Arc<RpcInvocation>,
+ ) -> Vec<BoxInvoker> {
+ invokers
+ }
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 58dca5f..145bcc8 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -23,6 +23,7 @@ use std::{
use async_trait::async_trait;
use aws_smithy_http::body::SdkBody;
+use dyn_clone::DynClone;
use tower_service::Service;
use dubbo_base::Url;
@@ -43,7 +44,7 @@ pub trait Exporter {
fn unexport(&self);
}
-pub trait Invoker<ReqBody>: Debug {
+pub trait Invoker<ReqBody>: Debug + DynClone {
type Response;
type Error;
@@ -68,6 +69,15 @@ pub type BoxInvoker = Box<
+ Sync,
>;
+dyn_clone::clone_trait_object!(
+ Invoker<
+ http::Request<SdkBody>,
+ Response = http::Response<crate::BoxBody>,
+ Error = crate::Error,
+ Future = crate::BoxFuture<http::Response<crate::BoxBody>,
crate::Error>,
+ >
+);
+
pub struct WrapperInvoker<T>(T);
impl<T, ReqBody> Service<http::Request<ReqBody>> for WrapperInvoker<T>
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs
b/dubbo/src/protocol/triple/triple_invoker.rs
index 6139cc9..fb661f9 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -17,24 +17,32 @@
use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
-use std::fmt::{Debug, Formatter};
+use std::{
+ fmt::{Debug, Formatter},
+ str::FromStr,
+};
use tower_service::Service;
-use crate::{protocol::Invoker, triple::client::builder::ClientBoxService};
+use crate::{
+ protocol::Invoker,
+ triple::{client::builder::ClientBoxService,
transport::connection::Connection},
+ utils::boxed_clone::BoxCloneService,
+};
+#[derive(Clone)]
pub struct TripleInvoker {
url: Url,
conn: ClientBoxService,
}
impl TripleInvoker {
- // pub fn new(url: Url) -> TripleInvoker {
- // let uri = http::Uri::from_str(&url.to_url()).unwrap();
- // Self {
- // url,
- // conn: ClientBuilder::from_uri(&uri).build()connect(),
- // }
- // }
+ pub fn new(url: Url) -> TripleInvoker {
+ let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ Self {
+ url,
+ conn: BoxCloneService::new(Connection::new().with_host(uri)),
+ }
+ }
}
impl Debug for TripleInvoker {
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index cf667cc..29957a6 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -16,18 +16,19 @@
*/
use crate::{
- cluster::directory::StaticDirectory,
- codegen::{ClusterInvoker, Directory, RegistryDirectory},
+ cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory},
+ codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker},
triple::compression::CompressionEncoding,
- utils::boxed::BoxService,
+ utils::boxed_clone::BoxCloneService,
};
use aws_smithy_http::body::SdkBody;
+use dubbo_base::Url;
use super::TripleClient;
pub type ClientBoxService =
- BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>,
crate::Error>;
+ BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>,
crate::Error>;
#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
@@ -35,6 +36,8 @@ pub struct ClientBuilder {
pub connector: &'static str,
directory: Option<Box<dyn Directory>>,
cluster_invoker: Option<ClusterInvoker>,
+ pub direct: bool,
+ host: String,
}
impl ClientBuilder {
@@ -44,6 +47,8 @@ impl ClientBuilder {
connector: "",
directory: None,
cluster_invoker: None,
+ direct: false,
+ host: "".to_string(),
}
}
@@ -53,15 +58,8 @@ impl ClientBuilder {
connector: "",
directory: Some(Box::new(StaticDirectory::new(&host))),
cluster_invoker: None,
- }
- }
-
- pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
- Self {
- timeout: None,
- connector: "",
- directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
- cluster_invoker: None,
+ direct: true,
+ host: host.clone().to_string(),
}
}
@@ -104,11 +102,29 @@ impl ClientBuilder {
}
}
+ pub fn with_direct(self, direct: bool) -> Self {
+ Self { direct, ..self }
+ }
+
pub fn build(self) -> TripleClient {
- TripleClient {
+ let mut cli = TripleClient {
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: self.directory,
cluster_invoker: self.cluster_invoker,
+ invoker: None,
+ };
+ if self.direct {
+ cli.invoker = Some(Box::new(TripleInvoker::new(
+ Url::from_url(&self.host).unwrap(),
+ )));
+ return cli;
}
+
+ let cluster =
MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new(
+
TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()),
+ )])));
+
+ cli.invoker = Some(cluster);
+ cli
}
}
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index 56edb96..eb7934d 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -30,6 +30,7 @@ use crate::codegen::{ClusterInvoker, Directory,
RpcInvocation};
use crate::{
cluster::support::cluster_invoker::ClusterRequestBuilder,
invocation::{IntoStreamingRequest, Metadata, Request, Response},
+ protocol::BoxInvoker,
triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding,
encode::encode},
};
@@ -38,11 +39,12 @@ pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
pub(crate) directory: Option<Box<dyn Directory>>,
pub(crate) cluster_invoker: Option<ClusterInvoker>,
+ pub invoker: Option<BoxInvoker>,
}
impl TripleClient {
pub fn connect(host: String) -> Self {
- let builder = ClientBuilder::from_static(&host);
+ let builder = ClientBuilder::from_static(&host).with_direct(true);
builder.build()
}
@@ -135,7 +137,7 @@ impl TripleClient {
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
- invocation: RpcInvocation,
+ _invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -150,27 +152,14 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let sdk_body = SdkBody::from(body);
- let arc_invocation = Arc::new(invocation);
- let req;
- let http_uri;
- if self.cluster_invoker.is_some() {
- let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
- req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
- http_uri = req.uri().clone();
- } else {
- let url_list = self
- .directory
- .as_ref()
- .expect("msg")
- .list(arc_invocation.clone());
- let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
- http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- req = self.map_request(http_uri.clone(), path, sdk_body);
- }
+ let bytes = hyper::body::to_bytes(body).await.unwrap();
+ let sdk_body = SdkBody::from(bytes);
+
+ // let mut conn = Connection::new().with_host(http_uri);
+ let mut conn = self.invoker.clone().unwrap();
+ let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
- let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
.await