This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b98d724 Rft: Merge service_discovery branch to main (#114)
b98d724 is described below
commit b98d724ef4c130f6321f67c6c707e4766fe69660
Author: 墨舟 <[email protected]>
AuthorDate: Mon Feb 20 11:42:32 2023 +0800
Rft: Merge service_discovery branch to main (#114)
* implement service discovery
* update github action to run ci (#63)
* update github action to run ci
* update github action, copy from main branch
* load balance and service registry closed loop (#105)
* feat(cluster): loadbalance types
* feat(rpc): types alias for cluster invoker
* feat(cluster): integration
* feat(cluster): integration with examples
* feat(cluster.loadbalance): greeter example with default random
loadbalance passed
* feat(cluster.loadbalance): completing roundrobin arithmetic and fixing
compile warns.
* typo
* fix compile warns
* fix rustfmt check fails.
* fix cargo check fails(due to the use of nightly channel locally).
* fix: default yaml config parse failed in ci.
* ci actions zk test
* feat(registry): zk support
* feat(registry): zk support, connected to zk
* feat(registry): provider.services key as service name
* feat(registry): serviceKey and configuration files, aligned to dubbo
ecology
* feat(commons): tested Url impl
* feat(commons): tested Url impl
* feat(zk): interface service discovery
* feat(zk): create_path_with_parent_check
* feat(zk): export bug fixed.
* feat: merged branch main to service_discovery
* cargo fmt
* Rft: merge from main branch.
---------
Co-authored-by: luyanbo <[email protected]>
Co-authored-by: Ken Liu <[email protected]>
Co-authored-by: Yang Yang <[email protected]>
---
.github/workflows/github-actions.yml | 12 +-
Cargo.toml | 2 +-
config/src/config.rs | 68 ++--
config/src/lib.rs | 1 +
config/src/protocol.rs | 54 +--
config/src/provider.rs | 3 +-
config/src/{lib.rs => registry.rs} | 21 +-
config/src/service.rs | 33 +-
dubbo-build/src/client.rs | 21 +-
dubbo/Cargo.toml | 2 +
dubbo/src/cluster/directory.rs | 8 +-
.../consts.rs => cluster/loadbalance/impls/mod.rs} | 6 +-
dubbo/src/cluster/loadbalance/impls/random.rs | 59 +++
dubbo/src/cluster/loadbalance/impls/roundrobin.rs | 85 +++++
.../lib.rs => dubbo/src/cluster/loadbalance/mod.rs | 34 +-
.../src/cluster/loadbalance/types.rs | 31 +-
dubbo/src/cluster/mod.rs | 17 +-
dubbo/src/cluster/support/cluster_invoker.rs | 147 ++++++++
.../{common/consts.rs => cluster/support/mod.rs} | 6 +-
dubbo/src/codegen.rs | 5 +-
dubbo/src/common/consts.rs | 12 +
dubbo/src/common/url.rs | 207 ++++++++---
dubbo/src/framework.rs | 143 ++++----
dubbo/src/invocation.rs | 8 +-
dubbo/src/protocol/mod.rs | 9 +-
dubbo/src/protocol/triple/triple_invoker.rs | 29 +-
dubbo/src/protocol/triple/triple_protocol.rs | 8 +-
.../lib.rs => dubbo/src/registry/integration.rs | 19 +-
dubbo/src/registry/memory_registry.rs | 15 +-
dubbo/src/registry/mod.rs | 18 +-
dubbo/src/registry/protocol.rs | 42 ++-
dubbo/src/registry/types.rs | 91 +++++
dubbo/src/triple/client/builder.rs | 21 +-
dubbo/src/triple/client/triple.rs | 128 ++++---
dubbo/src/triple/compression.rs | 4 +-
dubbo/src/triple/server/builder.rs | 8 +-
dubbo/src/triple/transport/connection.rs | 2 +
examples/echo/src/echo/server.rs | 36 +-
examples/echo/src/generated/grpc.examples.echo.rs | 12 +-
examples/greeter/Cargo.toml | 1 -
examples/greeter/dubbo.yaml | 34 +-
examples/greeter/src/greeter/client.rs | 7 +-
examples/greeter/src/greeter/server.rs | 51 +--
registry-nacos/src/nacos_registry.rs | 18 +-
registry-nacos/src/utils/mod.rs | 14 +-
registry-zookeeper/Cargo.toml | 3 +-
registry-zookeeper/src/zookeeper_registry.rs | 400 +++++++++++++++------
scripts/ci-check.sh | 23 ++
48 files changed, 1383 insertions(+), 595 deletions(-)
diff --git a/.github/workflows/github-actions.yml
b/.github/workflows/github-actions.yml
index 9e53db5..e8881e0 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -52,6 +52,13 @@ jobs:
example-greeter:
name: example/greeter
runs-on: ubuntu-latest
+ services:
+ zoo1:
+ image: zookeeper:3.8
+ ports:
+ - 2181:2181
+ env:
+ ZOO_MY_ID: 1
steps:
- uses: actions/checkout@main
- uses: actions-rs/toolchain@v1
@@ -83,6 +90,9 @@ jobs:
- name: example greeter
run: |
../../target/debug/greeter-server &
- sleep 1s ;
+ sleep 3
../../target/debug/greeter-client
+ env:
+ ZOOKEEPER_SERVERS: 127.0.0.1:2181
+ DUBBO_CONFIG_PATH: ./dubbo.yaml
working-directory: examples/greeter
diff --git a/Cargo.toml b/Cargo.toml
index 785e903..d4404f2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,5 +10,5 @@ members = [
"dubbo",
"examples/echo",
"examples/greeter",
- "dubbo-build",
+ "dubbo-build"
]
diff --git a/config/src/config.rs b/config/src/config.rs
index 88bf0de..ed3a714 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,33 +17,33 @@
use std::{collections::HashMap, env, fs};
+use crate::{protocol::Protocol, registry::RegistryConfig};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
+use tracing::trace;
use super::{protocol::ProtocolConfig, provider::ProviderConfig,
service::ServiceConfig};
pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
pub static GLOBAL_ROOT_CONFIG: OnceCell<RootConfig> = OnceCell::new();
+pub const DUBBO_CONFIG_PREFIX: &str = "dubbo";
/// used to storage all structed config, from some source: cmd, file..;
/// Impl Config trait, business init by read Config trait
#[allow(dead_code)]
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct RootConfig {
- pub name: String,
-
#[serde(default)]
- pub service: HashMap<String, ServiceConfig>,
- pub protocols: HashMap<String, ProtocolConfig>,
+ pub protocols: ProtocolConfig,
#[serde(default)]
- pub registries: HashMap<String, String>,
+ pub provider: ProviderConfig,
#[serde(default)]
- pub provider: ProviderConfig,
+ pub registries: HashMap<String, RegistryConfig>,
- #[serde(skip_serializing, skip_deserializing)]
+ #[serde(default)]
pub data: HashMap<String, String>,
}
@@ -59,8 +59,6 @@ pub fn get_global_config() -> &'static RootConfig {
impl RootConfig {
pub fn new() -> Self {
Self {
- name: "dubbo".to_string(),
- service: HashMap::new(),
protocols: HashMap::new(),
registries: HashMap::new(),
provider: ProviderConfig::new(),
@@ -86,12 +84,11 @@ impl RootConfig {
tracing::info!("current path: {:?}", env::current_dir());
let data = fs::read(config_path)?;
- let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap();
+ trace!("config data: {:?}", String::from_utf8(data.clone()));
+ let conf: HashMap<String, RootConfig> =
serde_yaml::from_slice(&data).unwrap();
+ let root_config: RootConfig =
conf.get(DUBBO_CONFIG_PREFIX).unwrap().clone();
tracing::debug!("origin config: {:?}", conf);
- for (name, svc) in conf.service.iter_mut() {
- svc.name = name.to_string();
- }
- Ok(conf)
+ Ok(root_config)
}
pub fn test_config(&mut self) {
@@ -101,53 +98,30 @@ impl RootConfig {
let service_config = ServiceConfig::default()
.group("test".to_string())
- .serializer("json".to_string())
.version("1.0.0".to_string())
- .protocol_names("triple".to_string())
- // Currently, the hello_echo.rs doesn't support the url which like
- // `{protocol}/{service config name}/{service name}(e.g.
triple://0.0.0.0:8888/{service config name}/grpc.examples.echo.Echo).
- // So we comment this line.
- // .name("grpc.examples.echo.Echo".to_strdding())
- .registry("zookeeper".to_string());
-
- let triple_config = ProtocolConfig::default()
- .name("triple".to_string())
- .ip("0.0.0.0".to_string())
- .port("8888".to_string())
- .listener("tcp".to_string());
-
- let service_config =
service_config.add_protocol_configs(triple_config);
- self.service
+ .protocol("triple".to_string())
+ .interface("grpc.examples.echo.Echo".to_string());
+
+ self.provider
+ .services
.insert("grpc.examples.echo.Echo".to_string(), service_config);
- self.service.insert(
+ self.provider.services.insert(
"helloworld.Greeter".to_string(),
ServiceConfig::default()
.group("test".to_string())
- .serializer("json".to_string())
.version("1.0.0".to_string())
- // .name("helloworld.Greeter".to_string())
- .protocol_names("triple".to_string())
- .registry("zookeeper".to_string()),
+ .interface("helloworld.Greeter".to_string())
+ .protocol("triple".to_string()),
);
self.protocols.insert(
"triple".to_string(),
- ProtocolConfig::default()
+ Protocol::default()
.name("triple".to_string())
.ip("0.0.0.0".to_string())
- .port("8889".to_string())
- .listener("tcp".to_string()),
+ .port("8889".to_string()),
);
- provider.services = self.service.clone();
self.provider = provider.clone();
-
- let mut registries = HashMap::new();
- registries.insert(
- "zookeeper".to_string(),
- "zookeeper://localhost:2181".to_string(),
- );
- self.registries = registries;
-
println!("provider config: {:?}", provider);
// 通过环境变量读取某个文件。加在到内存中
self.data.insert(
diff --git a/config/src/lib.rs b/config/src/lib.rs
index 70a33cd..e56d933 100644
--- a/config/src/lib.rs
+++ b/config/src/lib.rs
@@ -18,6 +18,7 @@
pub mod config;
pub mod protocol;
pub mod provider;
+pub mod registry;
pub mod service;
pub use config::*;
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 632e912..cdc357a 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -19,18 +19,26 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
+pub const DEFAULT_PROTOCOL: &str = "triple";
+
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
-pub struct ProtocolConfig {
+pub struct Protocol {
pub ip: String,
pub port: String,
pub name: String,
- pub listener: String,
#[serde(skip_serializing, skip_deserializing)]
pub params: HashMap<String, String>,
}
-impl ProtocolConfig {
+pub type ProtocolConfig = HashMap<String, Protocol>;
+
+pub trait ProtocolRetrieve {
+ fn get_protocol(&self, protocol_key: &str) -> Option<Protocol>;
+ fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol;
+}
+
+impl Protocol {
pub fn name(self, name: String) -> Self {
Self { name, ..self }
}
@@ -43,30 +51,36 @@ impl ProtocolConfig {
Self { port, ..self }
}
- pub fn listener(self, listener: String) -> Self {
- Self { listener, ..self }
- }
-
pub fn params(self, params: HashMap<String, String>) -> Self {
Self { params, ..self }
}
- pub fn add_param(mut self, key: String, value: String) -> Self {
- self.params.insert(key, value);
- self
+ pub fn to_url(self) -> String {
+ format!("{}://{}:{}", self.name, self.ip, self.port)
}
+}
- pub fn to_url(&self) -> String {
- let mut params_vec: Vec<String> = Vec::new();
- for (k, v) in self.params.iter() {
- // let tmp = format!("{}={}", k, v);
- params_vec.push(format!("{}={}", k, v));
+impl ProtocolRetrieve for ProtocolConfig {
+ fn get_protocol(&self, protocol_key: &str) -> Option<Protocol> {
+ let result = self.get(protocol_key);
+ if let Some(..) = result {
+ Some(result.unwrap().clone())
+ } else {
+ None
}
- let param = params_vec.join("&");
+ }
- format!(
- "{}://{}:{}?listener={}&{}",
- self.name, self.ip, self.port, self.listener, param
- )
+ fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol {
+ let result = self.get_protocol(protocol_key);
+ if let Some(..) = result {
+ result.unwrap().clone()
+ } else {
+ let result = self.get_protocol(protocol_key);
+ if result.is_none() {
+ panic!("default triple protocol dose not defined.")
+ } else {
+ result.unwrap()
+ }
+ }
}
}
diff --git a/config/src/provider.rs b/config/src/provider.rs
index 120dc84..ccb9cf8 100644
--- a/config/src/provider.rs
+++ b/config/src/provider.rs
@@ -25,10 +25,9 @@ use super::service::ServiceConfig;
pub struct ProviderConfig {
#[serde(default)]
pub registry_ids: Vec<String>,
-
#[serde(default)]
pub protocol_ids: Vec<String>,
-
+ #[serde(default)]
pub services: HashMap<String, ServiceConfig>,
}
diff --git a/config/src/lib.rs b/config/src/registry.rs
similarity index 78%
copy from config/src/lib.rs
copy to config/src/registry.rs
index 70a33cd..c2348b0 100644
--- a/config/src/lib.rs
+++ b/config/src/registry.rs
@@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use serde::{Deserialize, Serialize};
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
-
-pub use config::*;
-
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
- }
+#[derive(Debug, Default, Serialize, Deserialize, Clone)]
+pub struct RegistryConfig {
+ #[serde(default)]
+ pub protocol: String,
+ #[serde(default)]
+ pub address: String,
}
diff --git a/config/src/service.rs b/config/src/service.rs
index a876c0a..1f85a92 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -15,30 +15,19 @@
* limitations under the License.
*/
-use std::collections::HashMap;
-
use serde::{Deserialize, Serialize};
-use super::protocol::ProtocolConfig;
-
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct ServiceConfig {
pub version: String,
pub group: String,
-
- #[serde(skip_serializing, skip_deserializing)]
- pub name: String,
pub protocol: String,
- pub registry: String,
- pub serializer: String,
-
- #[serde(default)]
- pub protocol_configs: HashMap<String, ProtocolConfig>,
+ pub interface: String,
}
impl ServiceConfig {
- pub fn name(self, name: String) -> Self {
- Self { name, ..self }
+ pub fn interface(self, interface: String) -> Self {
+ Self { interface, ..self }
}
pub fn version(self, version: String) -> Self {
@@ -49,24 +38,10 @@ impl ServiceConfig {
Self { group, ..self }
}
- pub fn protocol_names(self, protocol: String) -> Self {
+ pub fn protocol(self, protocol: String) -> Self {
Self { protocol, ..self }
}
- pub fn serializer(self, serializer: String) -> Self {
- Self { serializer, ..self }
- }
-
- pub fn registry(self, registry: String) -> Self {
- Self { registry, ..self }
- }
-
- pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) ->
Self {
- self.protocol_configs
- .insert(protocol_config.name.clone(), protocol_config);
- Self { ..self }
- }
-
// pub fn get_url(&self) -> Vec<Url> {
// let mut urls = Vec::new();
// for (_, conf) in self.protocol_configs.iter() {
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 05e8e89..1e1c9fb 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -90,15 +90,10 @@ pub fn generate<T: Service>(
}
}
- // pub fn with_filter<F>(self, filter: F) ->
#service_ident<FilterService<T, F>>
- // where
- // F: Filter,
- // {
- // let inner = self.inner.with_filter(filter);
- // #service_ident {
- // inner,
- // }
- // }
+ pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self
{
+ self.inner = self.inner.with_cluster(invoker);
+ self
+ }
#methods
@@ -189,7 +184,7 @@ fn generate_unary<T: Method>(
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
- .with_servie_unique_name(String::from(#service_unique_name))
+ .with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.unary(
@@ -223,7 +218,7 @@ fn generate_server_streaming<T: Method>(
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
- .with_servie_unique_name(String::from(#service_unique_name))
+ .with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.server_streaming(
@@ -256,7 +251,7 @@ fn generate_client_streaming<T: Method>(
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
- .with_servie_unique_name(String::from(#service_unique_name))
+ .with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.client_streaming(
@@ -289,7 +284,7 @@ fn generate_streaming<T: Method>(
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
let invocation = RpcInvocation::default()
- .with_servie_unique_name(String::from(#service_unique_name))
+ .with_service_unique_name(String::from(#service_unique_name))
.with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner.bidi_streaming(
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index e319259..5369852 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -34,6 +34,8 @@ axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
aws-smithy-http = "0.54.1"
+itertools = "0.10.1"
+urlencoding = "2.1.2"
dubbo-config = {path = "../config", version = "0.2.0"}
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index d1b5373..02946a9 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -32,7 +32,7 @@ use crate::{
///
/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
pub trait Directory: Debug + DirectoryClone {
- fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
}
pub trait DirectoryClone {
@@ -77,9 +77,9 @@ impl StaticDirectory {
}
impl Directory for StaticDirectory {
- fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
+ "tri://{}:{}/{}",
self.uri.host().unwrap(),
self.uri.port().unwrap(),
invocation.get_target_service_unique_name(),
@@ -121,7 +121,7 @@ impl DirectoryClone for RegistryDirectory {
}
impl Directory for RegistryDirectory {
- fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+ fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
let service_name = invocation.get_target_service_unique_name();
let url = Url::from_url(&format!(
diff --git a/dubbo/src/common/consts.rs
b/dubbo/src/cluster/loadbalance/impls/mod.rs
similarity index 85%
copy from dubbo/src/common/consts.rs
copy to dubbo/src/cluster/loadbalance/impls/mod.rs
index c05c2c7..5a84af8 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/cluster/loadbalance/impls/mod.rs
@@ -14,7 +14,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
-pub const PROTOCOL: &str = "protocol";
-pub const REGISTRY: &str = "registry";
+pub mod random;
+pub mod roundrobin;
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs
b/dubbo/src/cluster/loadbalance/impls/random.rs
new file mode 100644
index 0000000..a5ca7df
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/random.rs
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+ fmt::{Debug, Formatter},
+ sync::Arc,
+};
+
+use crate::{
+ cluster::loadbalance::types::{LoadBalance, Metadata},
+ codegen::RpcInvocation,
+ common::url::Url,
+};
+
+pub struct RandomLoadBalance {
+ pub metadata: Metadata,
+}
+
+impl Default for RandomLoadBalance {
+ fn default() -> Self {
+ RandomLoadBalance {
+ metadata: Metadata::new("random"),
+ }
+ }
+}
+
+impl Debug for RandomLoadBalance {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "RandomLoadBalance")
+ }
+}
+
+impl LoadBalance for RandomLoadBalance {
+ fn select(
+ &self,
+ invokers: Arc<Vec<Url>>,
+ _url: Option<Url>,
+ _invocation: Arc<RpcInvocation>,
+ ) -> Option<Url> {
+ if invokers.is_empty() {
+ return None;
+ }
+ let index = rand::random::<usize>() % invokers.len();
+ Some(invokers[index].clone())
+ }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
new file mode 100644
index 0000000..cd951bb
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+ collections::HashMap,
+ fmt::{Debug, Formatter},
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc, RwLock,
+ },
+};
+
+use crate::{
+ cluster::loadbalance::types::{LoadBalance, Metadata},
+ codegen::RpcInvocation,
+ common::url::Url,
+};
+
+pub struct RoundRobinLoadBalance {
+ pub metadata: Metadata,
+ pub counter_map: RwLock<HashMap<String, AtomicUsize>>,
+}
+
+impl Default for RoundRobinLoadBalance {
+ fn default() -> Self {
+ RoundRobinLoadBalance {
+ metadata: Metadata::new("roundrobin"),
+ counter_map: RwLock::new(HashMap::new()),
+ }
+ }
+}
+
+impl Debug for RoundRobinLoadBalance {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "RoundRobinLoadBalance")
+ }
+}
+
+impl RoundRobinLoadBalance {
+ fn guarantee_counter_key(&self, key: &str) {
+ let contained = self.counter_map.try_read().unwrap().contains_key(key);
+ if !contained {
+ self.counter_map
+ .try_write()
+ .unwrap()
+ .insert(key.to_string(), AtomicUsize::new(0));
+ }
+ }
+}
+
+impl LoadBalance for RoundRobinLoadBalance {
+ fn select(
+ &self,
+ invokers: Arc<Vec<Url>>,
+ _url: Option<Url>,
+ invocation: Arc<RpcInvocation>,
+ ) -> Option<Url> {
+ if invokers.is_empty() {
+ return None;
+ }
+ let fingerprint = invocation.unique_fingerprint();
+ self.guarantee_counter_key(fingerprint.as_str());
+ let index = self
+ .counter_map
+ .try_read()
+ .unwrap()
+ .get(fingerprint.as_str())?
+ .fetch_add(1, Ordering::SeqCst)
+ % invokers.len();
+ Some(invokers[index].clone())
+ }
+}
diff --git a/config/src/lib.rs b/dubbo/src/cluster/loadbalance/mod.rs
similarity index 51%
copy from config/src/lib.rs
copy to dubbo/src/cluster/loadbalance/mod.rs
index 70a33cd..1d04f7f 100644
--- a/config/src/lib.rs
+++ b/dubbo/src/cluster/loadbalance/mod.rs
@@ -14,19 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use std::collections::HashMap;
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
+use lazy_static::lazy_static;
-pub use config::*;
+use crate::cluster::loadbalance::{
+ impls::{random::RandomLoadBalance, roundrobin::RoundRobinLoadBalance},
+ types::BoxLoadBalance,
+};
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
- }
+pub mod impls;
+pub mod types;
+
+lazy_static! {
+ pub static ref LOAD_BALANCE_EXTENSIONS: HashMap<String, BoxLoadBalance> =
+ init_loadbalance_extensions();
+}
+
+fn init_loadbalance_extensions() -> HashMap<String, BoxLoadBalance> {
+ let mut loadbalance_map: HashMap<String, BoxLoadBalance> = HashMap::new();
+ loadbalance_map.insert("random".to_string(),
Box::new(RandomLoadBalance::default()));
+ loadbalance_map.insert(
+ "roundrobin".to_string(),
+ Box::new(RoundRobinLoadBalance::default()),
+ );
+ loadbalance_map
}
diff --git a/config/src/lib.rs b/dubbo/src/cluster/loadbalance/types.rs
similarity index 63%
copy from config/src/lib.rs
copy to dubbo/src/cluster/loadbalance/types.rs
index 70a33cd..ac31176 100644
--- a/config/src/lib.rs
+++ b/dubbo/src/cluster/loadbalance/types.rs
@@ -15,18 +15,27 @@
* limitations under the License.
*/
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
+use std::{fmt::Debug, sync::Arc};
-pub use config::*;
+use crate::{codegen::RpcInvocation, common::url::Url};
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
+pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
+
+pub trait LoadBalance: Debug {
+ fn select(
+ &self,
+ invokers: Arc<Vec<Url>>,
+ url: Option<Url>,
+ invocation: Arc<RpcInvocation>,
+ ) -> Option<Url>;
+}
+
+pub struct Metadata {
+ pub name: &'static str,
+}
+
+impl Metadata {
+ pub fn new(name: &'static str) -> Self {
+ Metadata { name }
}
}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 2221afc..b6d8a7c 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -20,13 +20,11 @@ use std::{sync::Arc, task::Poll};
use aws_smithy_http::body::SdkBody;
use tower_service::Service;
-use crate::{
- common::url::Url,
- empty_body,
- protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker},
-};
+use crate::{empty_body, protocol::BoxInvoker};
pub mod directory;
+pub mod loadbalance;
+pub mod support;
pub trait Directory {
fn list(&self, meta: String) -> Vec<BoxInvoker>;
@@ -90,10 +88,11 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
pub struct MockDirectory {}
impl Directory for MockDirectory {
- fn list(&self, meta: String) -> Vec<BoxInvoker> {
- tracing::info!("MockDirectory: {}", meta);
- let u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
- vec![Box::new(TripleInvoker::new(u))]
+ fn list(&self, _meta: String) -> Vec<BoxInvoker> {
+ // tracing::info!("MockDirectory: {}", meta);
+ // let u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+ // vec![Box::new(TripleInvoker::new(u))]
+ todo!()
}
fn is_empty(&self) -> bool {
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs
b/dubbo/src/cluster/support/cluster_invoker.rs
new file mode 100644
index 0000000..e00b849
--- /dev/null
+++ b/dubbo/src/cluster/support/cluster_invoker.rs
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use aws_smithy_http::body::SdkBody;
+use std::{str::FromStr, sync::Arc};
+
+use http::{uri::PathAndQuery, Request};
+
+use crate::{
+ cluster::{
+ loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS},
+ support::DEFAULT_LOADBALANCE,
+ },
+ codegen::{Directory, RegistryDirectory, TripleClient},
+ common::url::Url,
+ invocation::RpcInvocation,
+};
+
+#[derive(Debug, Clone)]
+pub struct ClusterInvoker {
+ directory: Arc<RegistryDirectory>,
+ destroyed: bool,
+}
+
+pub trait ClusterInvokerSelector {
+ /// Select a invoker using loadbalance policy.
+ fn select(
+ &self,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ excluded: Arc<Vec<Url>>,
+ ) -> Option<Url>;
+
+ fn do_select(
+ &self,
+ loadbalance_key: Option<&str>,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ ) -> Option<Url>;
+}
+
+pub trait ClusterRequestBuilder {
+ fn build_req(
+ &self,
+ triple_client: &mut TripleClient,
+ path: http::uri::PathAndQuery,
+ invocation: Arc<RpcInvocation>,
+ body: SdkBody,
+ ) -> http::Request<SdkBody>;
+}
+
+impl ClusterInvoker {
+ pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
+ ClusterInvoker {
+ directory: Arc::new(registry_directory),
+ destroyed: false,
+ }
+ }
+
+ pub fn directory(&self) -> Arc<RegistryDirectory> {
+ self.directory.clone()
+ }
+
+ pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
+ if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
+ LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
+ } else {
+ println!(
+ "loadbalance {} not found, use default loadbalance {}",
+ loadbalance_key, DEFAULT_LOADBALANCE
+ );
+ LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
+ }
+ }
+
+ pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
+ !self.destroyed() && !self.directory.list(invocation).is_empty()
+ }
+
+ pub fn destroyed(&self) -> bool {
+ self.destroyed
+ }
+}
+
+impl ClusterInvokerSelector for ClusterInvoker {
+ fn select(
+ &self,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ _excluded: Arc<Vec<Url>>,
+ ) -> Option<Url> {
+ if invokers.is_empty() {
+ return None;
+ }
+ let instance_count = invokers.len();
+ return if instance_count == 1 {
+ Some(invokers.as_ref().first()?.clone())
+ } else {
+ let loadbalance = Some(DEFAULT_LOADBALANCE);
+ self.do_select(loadbalance, invocation, invokers)
+ };
+ }
+
+ /// picking instance invoker url from registry directory
+ fn do_select(
+ &self,
+ loadbalance_key: Option<&str>,
+ invocation: Arc<RpcInvocation>,
+ invokers: Arc<Vec<Url>>,
+ ) -> Option<Url> {
+ let loadbalance =
self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
+ loadbalance.select(invokers, None, invocation)
+ }
+}
+
+impl ClusterRequestBuilder for ClusterInvoker {
+ fn build_req(
+ &self,
+ triple_client: &mut TripleClient,
+ path: PathAndQuery,
+ invocation: Arc<RpcInvocation>,
+ body: SdkBody,
+ ) -> Request<SdkBody> {
+ let invokers = self.directory.list(invocation.clone());
+ let invoker_url = self
+ .select(invocation, Arc::new(invokers), Arc::new(Vec::new()))
+ .expect("no valid provider");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip,
invoker_url.port))
+ .unwrap();
+ triple_client.map_request(http_uri, path, body)
+ }
+}
diff --git a/dubbo/src/common/consts.rs b/dubbo/src/cluster/support/mod.rs
similarity index 85%
copy from dubbo/src/common/consts.rs
copy to dubbo/src/cluster/support/mod.rs
index c05c2c7..ae42cc2 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/cluster/support/mod.rs
@@ -15,6 +15,6 @@
* limitations under the License.
*/
-pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
-pub const PROTOCOL: &str = "protocol";
-pub const REGISTRY: &str = "registry";
+pub mod cluster_invoker;
+
+pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 5d0a273..98d784f 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -27,7 +27,10 @@ pub use hyper::Body as hyperBody;
pub use tower_service::Service;
pub use super::{
- cluster::directory::{Directory, RegistryDirectory},
+ cluster::{
+ directory::{Directory, RegistryDirectory},
+ support::cluster_invoker::ClusterInvoker,
+ },
empty_body,
invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
protocol::{triple::triple_invoker::TripleInvoker, Invoker},
diff --git a/dubbo/src/common/consts.rs b/dubbo/src/common/consts.rs
index c05c2c7..17993c8 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/common/consts.rs
@@ -18,3 +18,15 @@
pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
pub const PROTOCOL: &str = "protocol";
pub const REGISTRY: &str = "registry";
+
+// URL key
+pub const DUBBO_KEY: &str = "dubbo";
+pub const PROVIDERS_KEY: &str = "providers";
+pub const LOCALHOST_IP: &str = "127.0.0.1";
+pub const METADATA_MAPPING_KEY: &str = "mapping";
+pub const VERSION_KEY: &str = "version";
+pub const GROUP_KEY: &str = "group";
+pub const INTERFACE_KEY: &str = "interface";
+pub const ANYHOST_KEY: &str = "anyhost";
+pub const SIDE_KEY: &str = "side";
+pub const TIMESTAMP_KEY: &str = "timestamp";
diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs
index 29f9e2c..2a36d72 100644
--- a/dubbo/src/common/url.rs
+++ b/dubbo/src/common/url.rs
@@ -15,16 +15,26 @@
* limitations under the License.
*/
-use std::collections::HashMap;
+use std::{
+ collections::HashMap,
+ fmt::{Display, Formatter},
+};
+
+use crate::common::consts::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
+use http::Uri;
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Url {
- pub uri: String,
- pub protocol: String,
+ pub raw_url_string: String,
+ // value of scheme is different to protocol name, eg. triple -> tri://
+ pub scheme: String,
pub location: String,
pub ip: String,
pub port: String,
- pub service_key: Vec<String>,
+ // serviceKey format in dubbo java and go
'{group}/{interfaceName}:{version}'
+ pub service_key: String,
+ // same to interfaceName
+ pub service_name: String,
pub params: HashMap<String, String>,
}
@@ -41,88 +51,189 @@ impl Url {
tracing::error!("fail to parse url({}), err: {:?}", url, err);
})
.unwrap();
- let mut u = Self {
- uri: uri.to_string(),
- protocol: uri.scheme_str()?.to_string(),
+ let query = uri.path_and_query().unwrap().query();
+ let mut url_inst = Self {
+ raw_url_string: url.to_string(),
+ scheme: uri.scheme_str()?.to_string(),
ip: uri.authority()?.host().to_string(),
port: uri.authority()?.port()?.to_string(),
location: uri.authority()?.to_string(),
- service_key: uri
- .path()
- .trim_start_matches('/')
- .split(',')
- .map(|x| x.to_string())
- .collect::<Vec<_>>(),
- params: HashMap::new(),
+ service_key: uri.path().trim_start_matches('/').to_string(),
+ service_name: uri.path().trim_start_matches('/').to_string(),
+ params: if let Some(..) = query {
+ Url::decode(query.unwrap())
+ } else {
+ HashMap::new()
+ },
};
- if uri.query().is_some() {
- u.decode(uri.query().unwrap().to_string());
- u.service_key = u
- .get_param("service_names".to_string())
- .unwrap()
- .split(',')
- .map(|x| x.to_string())
- .collect::<Vec<_>>();
- }
-
- Some(u)
+ url_inst.renew_raw_url_string();
+ Some(url_inst)
}
- pub fn get_service_name(&self) -> Vec<String> {
+ pub fn get_service_key(&self) -> String {
self.service_key.clone()
}
- pub fn get_param(&self, key: String) -> Option<String> {
- self.params.get(&key).cloned()
+ pub fn get_service_name(&self) -> String {
+ self.service_name.clone()
}
- pub fn encode_param(&self) -> String {
+ pub fn get_param(&self, key: &str) -> Option<String> {
+ self.params.get(key).cloned()
+ }
+
+ fn encode_param(&self) -> String {
let mut params_vec: Vec<String> = Vec::new();
for (k, v) in self.params.iter() {
// let tmp = format!("{}={}", k, v);
params_vec.push(format!("{}={}", k, v));
}
- params_vec.join("&")
+ if params_vec.is_empty() {
+ "".to_string()
+ } else {
+ format!("?{}", params_vec.join("&"))
+ }
+ }
+
+ pub fn params_count(&self) -> usize {
+ self.params.len()
}
- pub fn decode(&mut self, params: String) {
- let p: Vec<String> = params.split('&').map(|v|
v.trim().to_string()).collect();
+ fn decode(raw_query_string: &str) -> HashMap<String, String> {
+ let mut params = HashMap::new();
+ let p: Vec<String> = raw_query_string
+ .split('&')
+ .map(|v| v.trim().to_string())
+ .collect();
for v in p.iter() {
let values: Vec<String> = v.split('=').map(|v|
v.trim().to_string()).collect();
if values.len() != 2 {
continue;
}
- self.params.insert(values[0].clone(), values[1].clone());
+ params.insert(values[0].clone(), values[1].clone());
}
+ params
+ }
+
+ pub fn set_param(&mut self, key: &str, value: &str) {
+ self.params.insert(key.to_string(), value.to_string());
+ self.renew_raw_url_string();
+ }
+
+ pub fn raw_url_string(&self) -> String {
+ self.raw_url_string.clone()
+ }
+
+ pub fn encoded_raw_url_string(&self) -> String {
+ urlencoding::encode(self.raw_url_string.as_str()).to_string()
+ }
+
+ fn build_service_key(&self) -> String {
+ format!(
+ "{group}/{interfaceName}:{version}",
+ group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
+ interfaceName =
self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
+ version =
self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
+ )
}
pub fn to_url(&self) -> String {
- format!("{}://{}:{}", self.protocol, self.ip, self.port)
+ self.raw_url_string()
+ }
+
+ fn renew_raw_url_string(&mut self) {
+ self.raw_url_string = format!(
+ "{}://{}:{}/{}{}",
+ self.scheme,
+ self.ip,
+ self.port,
+ self.service_name,
+ self.encode_param()
+ );
+ self.service_key = self.build_service_key()
+ }
+
+ // short_url is used for tcp listening
+ pub fn short_url(&self) -> String {
+ format!("{}://{}:{}", self.scheme, self.ip, self.port)
+ }
+}
+
+impl Display for Url {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(self.raw_url_string().as_str())
+ }
+}
+
+impl Into<Uri> for Url {
+ fn into(self) -> Uri {
+ self.raw_url_string.parse::<Uri>().unwrap()
+ }
+}
+
+impl From<&str> for Url {
+ fn from(url: &str) -> Self {
+ Url::from_url(url).unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::common::consts::{ANYHOST_KEY, VERSION_KEY};
#[test]
fn test_from_url() {
- let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
- println!("{:?}", u1.unwrap().get_service_name())
+ let mut u1 =
Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
+
application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
+
environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
+
module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
+
side=provider&timeout=3000×tamp=1556509797245&version=1.0.0&application=test");
+ assert_eq!(
+ u1.as_ref().unwrap().service_key,
+ "default/com.ikurento.user.UserProvider:1.0.0"
+ );
+ assert_eq!(
+ u1.as_ref()
+ .unwrap()
+ .get_param(ANYHOST_KEY)
+ .unwrap()
+ .as_str(),
+ "true"
+ );
+ assert_eq!(
+ u1.as_ref()
+ .unwrap()
+ .get_param("default.timeout")
+ .unwrap()
+ .as_str(),
+ "10000"
+ );
+ assert_eq!(u1.as_ref().unwrap().scheme, "tri");
+ assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
+ assert_eq!(u1.as_ref().unwrap().port, "20000");
+ assert_eq!(u1.as_ref().unwrap().params_count(), 18);
+ u1.as_mut().unwrap().set_param("key1", "value1");
+ assert_eq!(
+ u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
+ "value1"
+ );
+ assert_eq!(
+ u1.as_ref()
+ .unwrap()
+ .get_param(VERSION_KEY)
+ .unwrap()
+ .as_str(),
+ "1.0.0"
+ );
}
#[test]
- fn test_encode_params() {
- let mut u = Url::default();
- u.params.insert("method".to_string(), "GET".to_string());
- u.params.insert("args".to_string(), "GET".to_string());
-
- let en = u.encode_param();
- println!("encode_params: {:?}", en);
-
- let mut u1 = Url::default();
- u1.decode(en);
- println!("decode_params: {:?}", u1);
- assert_eq!(u1, u);
+ fn test2() {
+ let url: Url =
"tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
+ assert_eq!(
+ url.raw_url_string(),
+ "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
+ )
}
}
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index c8e4a9b..342ef6c 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -15,23 +15,33 @@
* limitations under the License.
*/
-use std::{collections::HashMap, pin::Pin};
+use std::{
+ collections::HashMap,
+ error::Error,
+ pin::Pin,
+ sync::{Arc, Mutex},
+};
use futures::{future, Future};
+use tracing::{debug, info};
use crate::{
common::url::Url,
protocol::{BoxExporter, Protocol},
- registry::protocol::RegistryProtocol,
+ registry::{
+ protocol::RegistryProtocol,
+ types::{Registries, RegistriesOperation},
+ BoxRegistry, Registry,
+ },
};
-use dubbo_config::{get_global_config, RootConfig};
+use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig};
// Invoker是否可以基于hyper写一个通用的
#[derive(Default)]
pub struct Dubbo {
protocols: HashMap<String, Vec<Url>>,
- registries: HashMap<String, Url>,
+ registries: Option<Registries>,
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
config: Option<&'static RootConfig>,
}
@@ -41,7 +51,7 @@ impl Dubbo {
tracing_subscriber::fmt::init();
Self {
protocols: HashMap::new(),
- registries: HashMap::new(),
+ registries: None,
service_registry: HashMap::new(),
config: None,
}
@@ -52,80 +62,83 @@ impl Dubbo {
self
}
- pub fn init(&mut self) {
- let conf = self.config.get_or_insert_with(get_global_config);
- tracing::debug!("global conf: {:?}", conf);
+ pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry)
-> Self {
+ if self.registries.is_none() {
+ self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
+ }
+ self.registries
+ .as_ref()
+ .unwrap()
+ .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+ self
+ }
- for (name, url) in conf.registries.iter() {
- self.registries.insert(
- name.clone(),
- Url::from_url(url).expect(&format!("url: {url} parse
failed.")),
- );
+ pub fn init(&mut self) -> Result<(), Box<dyn Error>> {
+ if self.config.is_none() {
+ self.config = Some(get_global_config())
}
- for (service_name, service_config) in conf.provider.services.iter() {
- let protocol_url = match service_config
- .protocol_configs
- .get(&service_config.protocol)
+ let root_config = self.config.as_ref().unwrap();
+ debug!("global conf: {:?}", root_config);
+ // env::set_var("ZOOKEEPER_SERVERS",root_config);
+ for (_, service_config) in root_config.provider.services.iter() {
+ info!("init service name: {}", service_config.interface);
+ let url = if root_config
+ .protocols
+ .contains_key(service_config.protocol.as_str())
{
- Some(protocol_url) => protocol_url,
- None => {
- let Some(protocol_url) =
conf.protocols.get(&service_config.protocol) else {
- tracing::warn!("protocol: {:?} not
exists", service_config.protocol);
- continue;
- };
- protocol_url
- }
+ let protocol = root_config
+ .protocols
+ .get_protocol_or_default(service_config.protocol.as_str());
+ let protocol_url =
+ format!("{}/{}", protocol.to_url(),
service_config.interface.clone(),);
+ info!("protocol_url: {:?}", protocol_url);
+ Url::from_url(&protocol_url)
+ } else {
+ return Err(format!("protocol {:?} not exists",
service_config.protocol).into());
};
- // let protocol_url = format!(
- // "{}/{}/{}",
- // &protocol_url.to_url(),
- // service_config.name,
- // service_name
- // );
- // service_names may be multiple
- let protocol_url = protocol_url
- .to_owned()
- .add_param("service_names".to_string(),
service_name.to_string());
- let protocol_url = protocol_url.to_url();
- tracing::info!("url: {}", protocol_url);
-
- let protocol_url = Url::from_url(&protocol_url)
- .expect(&format!("protocol url: {protocol_url} parse
failed."));
- self.protocols
- .entry(service_config.name.clone())
- .and_modify(|urls| urls.push(protocol_url.clone()))
- .or_insert(vec![protocol_url]);
-
- tracing::debug!(
- "service name: {service_name}, service_config: {:?}",
- service_config
- );
- let registry = &service_config.registry;
- let reg_url = self
- .registries
- .get(registry)
- .expect(&format!("can't find the registry: {registry}"));
- self.service_registry
- .entry(service_config.name.clone())
- .and_modify(|urls| urls.push(reg_url.to_owned()))
- .or_insert(vec![reg_url.to_owned()]);
+ info!("url: {:?}", url);
+ if url.is_none() {
+ continue;
+ }
+ let u = url.unwrap();
+ if self.protocols.get(&service_config.protocol).is_some() {
+ self.protocols
+ .get_mut(&service_config.protocol)
+ .unwrap()
+ .push(u);
+ } else {
+ self.protocols
+ .insert(service_config.protocol.clone(), vec![u]);
+ }
}
+ Ok(())
}
pub async fn start(&mut self) {
- self.init();
-
+ self.init().unwrap();
+ info!("starting...");
// TODO: server registry
-
- let mem_reg =
-
Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
+ let mem_reg = Box::new(
+ RegistryProtocol::new()
+ .with_registries(self.registries.as_ref().unwrap().clone())
+ .with_services(self.service_registry.clone()),
+ );
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> +
Send>>> = Vec::new();
for (name, items) in self.protocols.iter() {
for url in items.iter() {
- tracing::info!("protocol: {:?}, service url: {:?}", name, url);
+ info!("protocol: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
- async_vec.push(exporter)
+ async_vec.push(exporter);
+ //TODO multiple registry
+ if self.registries.is_some() {
+ self.registries
+ .as_ref()
+ .unwrap()
+ .default_registry()
+ .register(url.clone())
+ .unwrap();
+ }
}
}
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 423e9c7..bdd152b 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -15,8 +15,9 @@
* limitations under the License.
*/
+use std::{collections::HashMap, fmt::Debug, str::FromStr};
+
use futures_core::Stream;
-use std::{collections::HashMap, str::FromStr};
pub struct Request<T> {
pub message: T,
@@ -202,7 +203,7 @@ pub struct RpcInvocation {
}
impl RpcInvocation {
- pub fn with_servie_unique_name(mut self, service_unique_name: String) ->
Self {
+ pub fn with_service_unique_name(mut self, service_unique_name: String) ->
Self {
self.target_service_unique_name = service_unique_name;
self
}
@@ -210,6 +211,9 @@ impl RpcInvocation {
self.method_name = method_name;
self
}
+ pub fn unique_fingerprint(&self) -> String {
+ format!("{}#{}", self.target_service_unique_name, self.method_name)
+ }
}
impl Invocation for RpcInvocation {
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 656da2a..2c8ad8f 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,10 +15,8 @@
* limitations under the License.
*/
-pub mod server_desc;
-pub mod triple;
-
use std::{
+ fmt::Debug,
future::Future,
task::{Context, Poll},
};
@@ -29,6 +27,9 @@ use tower_service::Service;
use crate::common::url::Url;
+pub mod server_desc;
+pub mod triple;
+
#[async_trait]
pub trait Protocol {
type Invoker;
@@ -42,7 +43,7 @@ pub trait Exporter {
fn unexport(&self);
}
-pub trait Invoker<ReqBody> {
+pub trait Invoker<ReqBody>: Debug {
type Response;
type Error;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs
b/dubbo/src/protocol/triple/triple_invoker.rs
index df4d2ae..2bcc2d3 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,17 +15,11 @@
* limitations under the License.
*/
-use std::str::FromStr;
-
use aws_smithy_http::body::SdkBody;
+use std::fmt::{Debug, Formatter};
use tower_service::Service;
-use crate::{
- common::url::Url,
- protocol::Invoker,
- triple::{client::builder::ClientBoxService,
transport::connection::Connection},
- utils::boxed::BoxService,
-};
+use crate::{common::url::Url, protocol::Invoker,
triple::client::builder::ClientBoxService};
pub struct TripleInvoker {
url: Url,
@@ -33,13 +27,18 @@ pub struct TripleInvoker {
}
impl TripleInvoker {
- pub fn new(url: Url) -> TripleInvoker {
- let uri = http::Uri::from_str(&url.to_url()).unwrap();
- let conn = Connection::new().with_host(uri);
- Self {
- url,
- conn: BoxService::new(conn),
- }
+ // pub fn new(url: Url) -> TripleInvoker {
+ // let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ // Self {
+ // url,
+ // conn: ClientBuilder::from_uri(&uri).build()connect(),
+ // }
+ // }
+}
+
+impl Debug for TripleInvoker {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(format!("{:?}", self.url).as_str())
}
}
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs
b/dubbo/src/protocol/triple/triple_protocol.rs
index 3577f52..c6ade9b 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -47,7 +47,7 @@ impl TripleProtocol {
pub fn get_server(&self, url: Url) -> Option<TripleServer> {
self.servers
- .get(&url.service_key.join(","))
+ .get(&url.service_key)
.map(|data| data.to_owned())
}
}
@@ -61,10 +61,10 @@ impl Protocol for TripleProtocol {
}
async fn export(mut self, url: Url) -> BoxExporter {
+ // service_key is same to key of TRIPLE_SERVICES
let server = TripleServer::new();
- self.servers
- .insert(url.service_key.join(","), server.clone());
- server.serve(url).await;
+ self.servers.insert(url.service_key.clone(), server.clone());
+ server.serve(url.short_url().as_str().into()).await;
Box::new(TripleExporter::new())
}
diff --git a/config/src/lib.rs b/dubbo/src/registry/integration.rs
similarity index 75%
copy from config/src/lib.rs
copy to dubbo/src/registry/integration.rs
index 70a33cd..15b82d0 100644
--- a/config/src/lib.rs
+++ b/dubbo/src/registry/integration.rs
@@ -14,19 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use crate::{cluster::support::cluster_invoker::ClusterInvoker,
registry::BoxRegistry};
+use std::sync::Arc;
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
-
-pub use config::*;
-
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {
- let result = 2 + 2;
- assert_eq!(result, 4);
- }
+pub trait ClusterRegistryIntegration {
+ /// get cluster invoker struct
+ fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
}
diff --git a/dubbo/src/registry/memory_registry.rs
b/dubbo/src/registry/memory_registry.rs
index 29e0ec2..61d3e41 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -16,10 +16,12 @@
*/
#![allow(unused_variables, dead_code, missing_docs)]
+
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
+use tracing::debug;
use crate::common::url::Url;
@@ -47,9 +49,9 @@ impl MemoryRegistry {
impl Registry for MemoryRegistry {
type NotifyListener = MemoryNotifyListener;
- fn register(&mut self, mut url: crate::common::url::Url) -> Result<(),
crate::StdError> {
+ fn register(&mut self, mut url: Url) -> Result<(), crate::StdError> {
// define provider label: ${registry.group}/${service_name}/provider
- let registry_group = match
url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+ let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
Some(key) => key,
None => "dubbo".to_string(),
};
@@ -57,20 +59,20 @@ impl Registry for MemoryRegistry {
let dubbo_path = format!(
"/{}/{}/{}",
registry_group,
- url.get_service_name().join(","),
+ url.get_service_name(),
"provider",
);
url.params.insert("anyhost".to_string(), "true".to_string());
// define triple url path
- let raw_url = format!("{}?{}", url.to_url(), url.encode_param(),);
+ let raw_url = url.raw_url_string();
self.registries.write().unwrap().insert(dubbo_path, raw_url);
Ok(())
}
fn unregister(&mut self, url: crate::common::url::Url) -> Result<(),
crate::StdError> {
- let registry_group = match
url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+ let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
Some(key) => key,
None => "dubbo".to_string(),
};
@@ -78,7 +80,7 @@ impl Registry for MemoryRegistry {
let dubbo_path = format!(
"/{}/{}/{}",
registry_group,
- url.get_service_name().join(","),
+ url.get_service_name(),
"provider",
);
self.registries.write().unwrap().remove(&dubbo_path);
@@ -109,6 +111,7 @@ pub struct MemoryNotifyListener {
impl NotifyListener for MemoryNotifyListener {
fn notify(&self, event: super::ServiceEvent) {
+ debug!("notify {:?}", event);
let mut map = self.service_instances.write().expect("msg");
match event.action.as_str() {
"ADD" => map.insert(event.key, event.service),
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 8c56692..6352cf1 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -16,12 +16,14 @@
*/
#![allow(unused_variables, dead_code, missing_docs)]
+pub mod integration;
pub mod memory_registry;
pub mod protocol;
+pub mod types;
-use std::fmt::Debug;
+use std::fmt::{Debug, Formatter};
-use crate::common::url::Url;
+use crate::{common::url::Url, registry::memory_registry::MemoryNotifyListener};
pub trait Registry {
type NotifyListener;
@@ -38,18 +40,24 @@ pub trait NotifyListener {
fn notify_all(&self, event: ServiceEvent);
}
+#[derive(Debug)]
pub struct ServiceEvent {
pub key: String,
pub action: String,
pub service: Vec<Url>,
}
-pub type BoxRegistry =
- Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> +
Send + Sync>;
+pub type BoxRegistry = Box<dyn Registry<NotifyListener = MemoryNotifyListener>
+ Send + Sync>;
+
+impl Debug for BoxRegistry {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str("BoxRegistry")
+ }
+}
#[derive(Default)]
pub struct RegistryWrapper {
- pub registry: Option<Box<dyn Registry<NotifyListener =
memory_registry::MemoryNotifyListener>>>,
+ pub registry: Option<Box<dyn Registry<NotifyListener =
MemoryNotifyListener>>>,
}
impl Clone for RegistryWrapper {
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 45a24cb..04b8ac9 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -17,7 +17,8 @@
use std::{
collections::HashMap,
- sync::{Arc, RwLock},
+ fmt::{Debug, Formatter},
+ sync::{Arc, Mutex, RwLock},
};
use super::{memory_registry::MemoryRegistry, BoxRegistry};
@@ -27,27 +28,46 @@ use crate::{
triple::{triple_exporter::TripleExporter,
triple_protocol::TripleProtocol},
BoxExporter, BoxInvoker, Protocol,
},
+ registry::types::Registries,
};
#[derive(Clone, Default)]
pub struct RegistryProtocol {
// registerAddr: Registry
- registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
+ registries: Option<Registries>,
// providerUrl: Exporter
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
// serviceName: registryUrls
services: HashMap<String, Vec<Url>>,
}
+impl Debug for RegistryProtocol {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(
+ format!(
+ "RegistryProtocol services{:#?},registries,{:#?}",
+ self.services,
+ self.registries.clone()
+ )
+ .as_str(),
+ )
+ }
+}
+
impl RegistryProtocol {
pub fn new() -> Self {
RegistryProtocol {
- registries: Arc::new(RwLock::new(HashMap::new())),
+ registries: None,
exporters: Arc::new(RwLock::new(HashMap::new())),
services: HashMap::new(),
}
}
+ pub fn with_registries(mut self, registries: Registries) -> Self {
+ self.registries = Some(registries);
+ self
+ }
+
pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) ->
Self {
self.services.extend(services);
self
@@ -56,9 +76,11 @@ impl RegistryProtocol {
pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
let mem = MemoryRegistry::default();
self.registries
- .write()
+ .as_ref()
+ .unwrap()
+ .lock()
.unwrap()
- .insert(url.location, Box::new(mem.clone()));
+ .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
Box::new(mem)
}
@@ -78,23 +100,23 @@ impl Protocol for RegistryProtocol {
// init Exporter based on provider_url
// server registry based on register_url
// start server health check
- let registry_url =
self.services.get(url.get_service_name().join(",").as_str());
+ let registry_url = self.services.get(url.get_service_name().as_str());
if let Some(urls) = registry_url {
for url in urls.clone().iter() {
- if !url.protocol.is_empty() {
+ if !url.service_key.is_empty() {
let mut reg = self.get_registry(url.clone());
reg.register(url.clone()).unwrap();
}
}
}
- match url.clone().protocol.as_str() {
- "triple" => {
+ match url.clone().scheme.as_str() {
+ "tri" => {
let pro = Box::new(TripleProtocol::new());
return pro.export(url).await;
}
_ => {
- tracing::error!("protocol {:?} not implemented", url.protocol);
+ tracing::error!("protocol {:?} not implemented", url.scheme);
Box::new(TripleExporter::new())
}
}
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
new file mode 100644
index 0000000..77d253a
--- /dev/null
+++ b/dubbo/src/registry/types.rs
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+ collections::HashMap,
+ sync::{Arc, Mutex},
+};
+
+use itertools::Itertools;
+use tracing::info;
+
+use crate::{
+ common::url::Url,
+ registry::{memory_registry::MemoryNotifyListener, BoxRegistry, Registry},
+ StdError,
+};
+
+pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
+pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
+
+pub const DEFAULT_REGISTRY_KEY: &str = "default";
+
+pub trait RegistriesOperation {
+ fn get(&self, registry_key: &str) -> SafeRegistry;
+ fn insert(&self, registry_key: String, registry: SafeRegistry);
+ fn default_registry(&self) -> SafeRegistry;
+}
+
+impl RegistriesOperation for Registries {
+ fn get(&self, registry_key: &str) -> SafeRegistry {
+ self.as_ref()
+ .lock()
+ .unwrap()
+ .get(registry_key)
+ .unwrap()
+ .clone()
+ }
+
+ fn insert(&self, registry_key: String, registry: SafeRegistry) {
+ self.as_ref().lock().unwrap().insert(registry_key, registry);
+ }
+
+ fn default_registry(&self) -> SafeRegistry {
+ let guard = self.as_ref().lock().unwrap();
+ let (_, result) = guard
+ .iter()
+ .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
+ .unwrap()
+ .to_owned();
+ result.clone()
+ }
+}
+
+impl Registry for SafeRegistry {
+ type NotifyListener = MemoryNotifyListener;
+
+ fn register(&mut self, url: Url) -> Result<(), StdError> {
+ info!("register {}.", url);
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+
+ fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+
+ fn subscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+
+ fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
+ self.lock().unwrap().register(url).expect("registry err.");
+ Ok(())
+ }
+}
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index 4746249..cf667cc 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -16,8 +16,10 @@
*/
use crate::{
- cluster::directory::StaticDirectory, codegen::Directory,
- triple::compression::CompressionEncoding, utils::boxed::BoxService,
+ cluster::directory::StaticDirectory,
+ codegen::{ClusterInvoker, Directory, RegistryDirectory},
+ triple::compression::CompressionEncoding,
+ utils::boxed::BoxService,
};
use aws_smithy_http::body::SdkBody;
@@ -32,6 +34,7 @@ pub struct ClientBuilder {
pub timeout: Option<u64>,
pub connector: &'static str,
directory: Option<Box<dyn Directory>>,
+ cluster_invoker: Option<ClusterInvoker>,
}
impl ClientBuilder {
@@ -40,6 +43,7 @@ impl ClientBuilder {
timeout: None,
connector: "",
directory: None,
+ cluster_invoker: None,
}
}
@@ -48,6 +52,7 @@ impl ClientBuilder {
timeout: None,
connector: "",
directory: Some(Box::new(StaticDirectory::new(&host))),
+ cluster_invoker: None,
}
}
@@ -56,6 +61,7 @@ impl ClientBuilder {
timeout: None,
connector: "",
directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
+ cluster_invoker: None,
}
}
@@ -70,6 +76,15 @@ impl ClientBuilder {
pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
Self {
directory: Some(directory),
+ cluster_invoker: None,
+ ..self
+ }
+ }
+
+ pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
+ Self {
+ directory: None,
+ cluster_invoker: Some(ClusterInvoker::with_directory(registry)),
..self
}
}
@@ -84,6 +99,7 @@ impl ClientBuilder {
pub fn with_connector(self, connector: &'static str) -> Self {
Self {
connector: connector,
+ cluster_invoker: None,
..self
}
}
@@ -92,6 +108,7 @@ impl ClientBuilder {
TripleClient {
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: self.directory,
+ cluster_invoker: self.cluster_invoker,
}
}
}
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index 454e953..56edb96 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-use std::str::FromStr;
+use std::{str::FromStr, sync::Arc};
use futures_util::{future, stream, StreamExt, TryStreamExt};
@@ -25,12 +25,10 @@ use rand::prelude::SliceRandom;
use tower_service::Service;
use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::{
- cluster::{FailoverCluster, MockDirectory},
- codegen::{Directory, RpcInvocation},
-};
+use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
use crate::{
+ cluster::support::cluster_invoker::ClusterRequestBuilder,
invocation::{IntoStreamingRequest, Metadata, Request, Response},
triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding,
encode::encode},
};
@@ -39,6 +37,7 @@ use crate::{
pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
pub(crate) directory: Option<Box<dyn Directory>>,
+ pub(crate) cluster_invoker: Option<ClusterInvoker>,
}
impl TripleClient {
@@ -52,7 +51,14 @@ impl TripleClient {
builder.build()
}
- fn map_request(
+ pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
+ TripleClient {
+ cluster_invoker: Some(invoker),
+ ..self
+ }
+ }
+
+ pub fn map_request(
&self,
uri: http::Uri,
path: http::uri::PathAndQuery,
@@ -144,19 +150,28 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let bytes = hyper::body::to_bytes(body).await.unwrap();
- let sdk_body = SdkBody::from(bytes);
-
- let url_list = self.directory.as_ref().expect("msg").list(invocation);
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
-
- let req = self.map_request(http_uri.clone(), path, sdk_body);
+ let sdk_body = SdkBody::from(body);
+ let arc_invocation = Arc::new(invocation);
+ let req;
+ let http_uri;
+ if self.cluster_invoker.is_some() {
+ let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
+ req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
+ http_uri = req.uri().clone();
+ } else {
+ let url_list = self
+ .directory
+ .as_ref()
+ .expect("msg")
+ .list(arc_invocation.clone());
+ let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
+ http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+ req = self.map_request(http_uri.clone(), path, sdk_body);
+ }
- // let mut conn = Connection::new().with_host(http_uri);
- let mut cluster = FailoverCluster::new(Box::new(MockDirectory {}));
- let response = cluster
+ let mut conn = Connection::new().with_host(http_uri);
+ let response = conn
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -210,14 +225,24 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
-
- let url_list = self.directory.as_ref().expect("msg").list(invocation);
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
-
- let req = self.map_request(http_uri.clone(), path, sdk_body);
-
+ let arc_invocation = Arc::new(invocation);
+ let req;
+ let http_uri;
+ if self.cluster_invoker.is_some() {
+ let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
+ req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
+ http_uri = req.uri().clone();
+ } else {
+ let url_list = self
+ .directory
+ .as_ref()
+ .expect("msg")
+ .list(arc_invocation.clone());
+ let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
+ http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+ req = self.map_request(http_uri.clone(), path, sdk_body);
+ }
let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
@@ -257,14 +282,24 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
-
- let url_list = self.directory.as_ref().expect("msg").list(invocation);
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
-
- let req = self.map_request(http_uri.clone(), path, sdk_body);
-
+ let arc_invocation = Arc::new(invocation);
+ let req;
+ let http_uri;
+ if self.cluster_invoker.is_some() {
+ let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
+ req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
+ http_uri = req.uri().clone();
+ } else {
+ let url_list = self
+ .directory
+ .as_ref()
+ .expect("msg")
+ .list(arc_invocation.clone());
+ let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
+ http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+ req = self.map_request(http_uri.clone(), path, sdk_body);
+ }
let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
@@ -320,13 +355,24 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);
let sdk_body = SdkBody::from(body);
-
- let url_list = self.directory.as_ref().expect("msg").list(invocation);
- let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
- let http_uri =
- http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
-
- let req = self.map_request(http_uri.clone(), path, sdk_body);
+ let arc_invocation = Arc::new(invocation);
+ let req;
+ let http_uri;
+ if self.cluster_invoker.is_some() {
+ let cluster_invoker =
self.cluster_invoker.as_ref().unwrap().clone();
+ req = cluster_invoker.build_req(self, path,
arc_invocation.clone(), sdk_body);
+ http_uri = req.uri().clone();
+ } else {
+ let url_list = self
+ .directory
+ .as_ref()
+ .expect("msg")
+ .list(arc_invocation.clone());
+ let real_url = url_list.choose(&mut
rand::thread_rng()).expect("msg");
+ http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+ req = self.map_request(http_uri.clone(), path, sdk_body);
+ }
let mut conn = Connection::new().with_host(http_uri);
let response = conn
diff --git a/dubbo/src/triple/compression.rs b/dubbo/src/triple/compression.rs
index 5e3ee86..9147b84 100644
--- a/dubbo/src/triple/compression.rs
+++ b/dubbo/src/triple/compression.rs
@@ -113,12 +113,12 @@ fn test_compress() {
let len = src.len();
src.reserve(len);
- compress(CompressionEncoding::Gzip, &mut src, &mut dst, len);
+ compress(CompressionEncoding::Gzip, &mut src, &mut dst, len).unwrap();
println!("src: {:?}, dst: {:?}", src, dst);
let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
let de_len = dst.len();
- decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len);
+ decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst,
de_len).unwrap();
println!("src: {:?}, dst: {:?}", dst, de_dst);
}
diff --git a/dubbo/src/triple/server/builder.rs
b/dubbo/src/triple/server/builder.rs
index d1bcbcc..af3cc6d 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -90,14 +90,14 @@ impl ServerBuilder {
}
pub async fn serve(self) -> Result<(), crate::Error> {
- tracing::info!("server starting. addr: {:?}", self.addr);
+ tracing::info!("server starting. addr: {:?}", self.addr.unwrap());
self.server.serve(self.addr.unwrap()).await
}
}
impl From<Url> for ServerBuilder {
fn from(u: Url) -> Self {
- let uri = match http::Uri::from_str(&u.to_url()) {
+ let uri = match http::Uri::from_str(&u.raw_url_string()) {
Ok(v) => v,
Err(err) => {
tracing::error!("http uri parse error: {}, url: {:?}", err,
&u);
@@ -108,9 +108,9 @@ impl From<Url> for ServerBuilder {
let authority = uri.authority().unwrap();
Self {
- listener: u.get_param("listener".to_string()).unwrap(),
+ listener: u.get_param("listener").unwrap_or("tcp".to_string()),
addr: authority.to_string().to_socket_addrs().unwrap().next(),
- service_names: u.service_key,
+ service_names: vec![u.service_name],
server: DubboServer::default(),
}
}
diff --git a/dubbo/src/triple/transport/connection.rs
b/dubbo/src/triple/transport/connection.rs
index fe9cc16..c99b399 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -19,6 +19,7 @@ use std::task::Poll;
use hyper::client::{conn::Builder, service::Connect};
use tower_service::Service;
+use tracing::debug;
use crate::{boxed, triple::transport::connector::get_connector};
@@ -84,6 +85,7 @@ where
let mut connector = Connect::new(get_connector(self.connector),
builder);
let uri = self.host.clone();
let fut = async move {
+ debug!("send rpc call to {}", uri);
let mut con = connector.call(uri).await.unwrap();
con.call(req)
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index 73c6da2..4a40d66 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -103,25 +103,8 @@ impl Echo for EchoServerImpl {
}))
}
- async fn client_streaming_echo(
- &self,
- req: Request<Decoding<EchoRequest>>,
- ) -> Result<Response<EchoResponse>, dubbo::status::Status> {
- let mut s = req.into_inner();
- loop {
- let result = s.next().await;
- match result {
- Some(Ok(val)) => println!("result: {:?}", val),
- Some(Err(val)) => println!("err: {:?}", val),
- None => break,
- }
- }
- Ok(Response::new(EchoResponse {
- message: "hello client streaming".to_string(),
- }))
- }
-
type ServerStreamingEchoStream = ResponseStream;
+
async fn server_streaming_echo(
&self,
req: Request<EchoRequest>,
@@ -143,6 +126,23 @@ impl Echo for EchoServerImpl {
Ok(Response::new(Box::pin(resp)))
}
+ async fn client_streaming_echo(
+ &self,
+ req: Request<Decoding<EchoRequest>>,
+ ) -> Result<Response<EchoResponse>, dubbo::status::Status> {
+ let mut s = req.into_inner();
+ loop {
+ let result = s.next().await;
+ match result {
+ Some(Ok(val)) => println!("result: {:?}", val),
+ Some(Err(val)) => println!("err: {:?}", val),
+ None => break,
+ }
+ }
+ Ok(Response::new(EchoResponse {
+ message: "hello client streaming".to_string(),
+ }))
+ }
type BidirectionalStreamingEchoStream = ResponseStream;
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs
b/examples/echo/src/generated/grpc.examples.echo.rs
index aa5d82b..c095ddf 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -31,6 +31,10 @@ pub mod echo_client {
inner: TripleClient::new(builder),
}
}
+ pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
+ self.inner = self.inner.with_cluster(invoker);
+ self
+ }
/// UnaryEcho is unary echo.
pub async fn unary_echo(
&mut self,
@@ -41,7 +45,7 @@ pub mod echo_client {
super::EchoResponse,
>::default();
let invocation = RpcInvocation::default()
-
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/UnaryEcho",
@@ -58,7 +62,7 @@ pub mod echo_client {
super::EchoResponse,
>::default();
let invocation = RpcInvocation::default()
-
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
@@ -75,7 +79,7 @@ pub mod echo_client {
super::EchoResponse,
>::default();
let invocation = RpcInvocation::default()
-
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
@@ -92,7 +96,7 @@ pub mod echo_client {
super::EchoResponse,
>::default();
let invocation = RpcInvocation::default()
-
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index c4b20cc..c34b19f 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -24,7 +24,6 @@ async-trait = "0.1.56"
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = "0.2.0"
-
dubbo = {path = "../../dubbo", version = "0.2.0"}
dubbo-config = {path = "../../config", version = "0.2.0"}
dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version =
"0.2.0"}
diff --git a/examples/greeter/dubbo.yaml b/examples/greeter/dubbo.yaml
index b6552f8..96daf95 100644
--- a/examples/greeter/dubbo.yaml
+++ b/examples/greeter/dubbo.yaml
@@ -1,17 +1,17 @@
-name: dubbo
-provider:
- services:
- org.apache.dubbo.sample.tri.Greeter:
- version: 1.0.0
- group: test
- protocol: triple
- registry: 'zookeeper'
- serializer: json
-registries:
- zookeeper: zookeeper://localhost:2181/
-protocols:
- triple:
- ip: 0.0.0.0
- port: '8888'
- name: triple
- listener: tcp
+dubbo:
+ protocols:
+ triple:
+ ip: 0.0.0.0
+ port: '8888'
+ name: tri
+ registries:
+ demoZK:
+ protocol: zookeeper
+ address: 10.0.6.6:2181
+ provider:
+ services:
+ GreeterProvider:
+ version: 1.0.0
+ group: test
+ protocol: triple
+ interface: org.apache.dubbo.sample.tri.Greeter
\ No newline at end of file
diff --git a/examples/greeter/src/greeter/client.rs
b/examples/greeter/src/greeter/client.rs
index c493d60..d620843 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -21,6 +21,7 @@ pub mod protos {
}
use dubbo::codegen::*;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
use tracing::Level;
@@ -38,7 +39,7 @@ async fn main() {
tracing::subscriber::set_global_default(subscriber).expect("setting
default subscriber failed");
- let mut cli =
GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888"));
+ // let mut cli =
GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888"));
// Here is example for zk
// let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
@@ -49,6 +50,10 @@ async fn main() {
// let directory = RegistryDirectory::new(Box::new(zkr));
// cli = cli.with_directory(Box::new(directory));
+ let zkr = ZookeeperRegistry::default();
+ let directory = RegistryDirectory::new(Box::new(zkr));
+ let mut cli =
GreeterClient::new(ClientBuilder::new().with_registry_directory(directory));
+ // using loop for loadbalance test
println!("# unary call");
let resp = cli
.greet(Request::new(GreeterRequest {
diff --git a/examples/greeter/src/greeter/server.rs
b/examples/greeter/src/greeter/server.rs
index 37de662..271a54b 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -15,47 +15,48 @@
* limitations under the License.
*/
-pub mod protos {
- #![allow(non_camel_case_types)]
- include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
-}
-
-use futures_util::StreamExt;
-use protos::{
- greeter_server::{register_server, Greeter},
- GreeterReply, GreeterRequest,
-};
-
use std::{io::ErrorKind, pin::Pin};
use async_trait::async_trait;
-use futures_util::Stream;
+use futures_util::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
+use tracing::info;
use dubbo::{codegen::*, Dubbo};
use dubbo_config::RootConfig;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+use protos::{
+ greeter_server::{register_server, Greeter},
+ GreeterReply, GreeterRequest,
+};
+
+pub mod protos {
+ #![allow(non_camel_case_types)]
+ include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
+}
type ResponseStream =
Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> +
Send>>;
#[tokio::main]
async fn main() {
+ use tracing::{span, Level};
+ let span = span!(Level::DEBUG, "greeter.server");
+ let _enter = span.enter();
register_server(GreeterServerImpl {
name: "greeter".to_string(),
});
-
- // Dubbo::new().start().await;
- Dubbo::new()
- .with_config({
- let r = RootConfig::new();
- match r.load() {
- Ok(config) => config,
- Err(_err) => panic!("err: {:?}", _err), // response was droped
- }
- })
- .start()
- .await;
+ let zkr = ZookeeperRegistry::default();
+ let r = RootConfig::new();
+ let r = match r.load() {
+ Ok(config) => config,
+ Err(_err) => panic!("err: {:?}", _err), // response was droped
+ };
+ let mut f = Dubbo::new()
+ .with_config(r)
+ .add_registry("zookeeper", Box::new(zkr));
+ f.start().await;
}
#[allow(dead_code)]
@@ -71,7 +72,7 @@ impl Greeter for GreeterServerImpl {
&self,
request: Request<GreeterRequest>,
) -> Result<Response<GreeterReply>, dubbo::status::Status> {
- println!("GreeterServer::greet {:?}", request.metadata);
+ info!("GreeterServer::greet {:?}", request.metadata);
Ok(Response::new(GreeterReply {
message: "hello, dubbo-rust".to_string(),
diff --git a/registry-nacos/src/nacos_registry.rs
b/registry-nacos/src/nacos_registry.rs
index a020e6b..042d6f2 100644
--- a/registry-nacos/src/nacos_registry.rs
+++ b/registry-nacos/src/nacos_registry.rs
@@ -142,9 +142,9 @@ impl Registry for NacosRegistry {
type NotifyListener = Arc<dyn NotifyListener + Sync + Send + 'static>;
fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
- let side = url.get_param(SIDE_KEY.to_owned()).unwrap_or_default();
+ let side = url.get_param(SIDE_KEY).unwrap_or_default();
let register_consumer = url
- .get_param(REGISTER_CONSUMER_URL_KEY.to_owned())
+ .get_param(REGISTER_CONSUMER_URL_KEY)
.unwrap_or_else(|| false.to_string())
.parse::<bool>()
.unwrap_or(false);
@@ -313,21 +313,17 @@ struct NacosServiceName {
impl NacosServiceName {
fn new(url: &Url) -> NacosServiceName {
- let service_interface = url
- .get_service_name()
- .into_iter()
- .next()
- .unwrap_or_default();
+ let service_interface = url.get_service_name();
- let category =
url.get_param(CATEGORY_KEY.to_owned()).unwrap_or_default();
+ let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
- let version =
url.get_param(VERSION_KEY.to_owned()).unwrap_or_default();
+ let version = url.get_param(VERSION_KEY).unwrap_or_default();
- let group = url.get_param(GROUP_KEY.to_owned()).unwrap_or_default();
+ let group = url.get_param(GROUP_KEY).unwrap_or_default();
Self {
category,
- service_interface,
+ service_interface: service_interface.clone(),
version,
group,
}
diff --git a/registry-nacos/src/utils/mod.rs b/registry-nacos/src/utils/mod.rs
index 94f905a..2cca6a2 100644
--- a/registry-nacos/src/utils/mod.rs
+++ b/registry-nacos/src/utils/mod.rs
@@ -40,7 +40,7 @@ pub(crate) fn build_nacos_client_props(url: &Url) ->
(nacos_sdk::api::props::Cli
let host = &url.ip;
let port = &url.port;
let backup = url
- .get_param(BACKUP_KEY.to_owned())
+ .get_param(BACKUP_KEY)
.map(|mut data| {
data.insert(0, ',');
data
@@ -49,13 +49,13 @@ pub(crate) fn build_nacos_client_props(url: &Url) ->
(nacos_sdk::api::props::Cli
let server_addr = format!("{}:{}{}", host, port, backup);
let namespace = url
- .get_param(NAMESPACE_KEY.to_owned())
- .unwrap_or_else(|| DEFAULT_NAMESPACE.to_owned());
+ .get_param(NAMESPACE_KEY)
+ .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
let app_name = url
- .get_param(APP_NAME_KEY.to_owned())
- .unwrap_or_else(|| UNKNOWN_APP.to_owned());
- let username = url.get_param(USERNAME_KEY.to_owned()).unwrap_or_default();
- let password = url.get_param(PASSWORD_KEY.to_owned()).unwrap_or_default();
+ .get_param(APP_NAME_KEY)
+ .unwrap_or_else(|| UNKNOWN_APP.to_string());
+ let username = url.get_param(USERNAME_KEY).unwrap_or_default();
+ let password = url.get_param(PASSWORD_KEY).unwrap_or_default();
let enable_auth = !password.is_empty() && !username.is_empty();
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
index cf040fa..722462a 100644
--- a/registry-zookeeper/Cargo.toml
+++ b/registry-zookeeper/Cargo.toml
@@ -7,8 +7,9 @@ license = "Apache-2.0"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-zookeeper = "0.6.1"
+zookeeper = "0.7.0"
dubbo = {path = "../dubbo/", version = "0.2.0"}
serde_json = "1.0"
serde = {version = "1.0.145",features = ["derive"]}
tracing = "0.1"
+urlencoding = "2.1.2"
\ No newline at end of file
diff --git a/registry-zookeeper/src/zookeeper_registry.rs
b/registry-zookeeper/src/zookeeper_registry.rs
index be4e24b..a9fc9ec 100644
--- a/registry-zookeeper/src/zookeeper_registry.rs
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -17,24 +17,36 @@
#![allow(unused_variables, dead_code, missing_docs)]
-use dubbo::{
- common::url::Url,
- registry::{memory_registry::MemoryNotifyListener, NotifyListener,
Registry, ServiceEvent},
- StdError,
-};
-use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
- sync::{Arc, RwLock},
+ env,
+ sync::{Arc, Mutex, RwLock},
time::Duration,
};
-use tracing::info;
-use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+use serde::{Deserialize, Serialize};
+use tracing::{error, info};
+#[allow(unused_imports)]
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher,
ZooKeeper};
+
+use dubbo::{
+ cluster::support::cluster_invoker::ClusterInvoker,
+ codegen::BoxRegistry,
+ common::{
+ consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
+ url::Url,
+ },
+ registry::{
+ integration::ClusterRegistryIntegration,
+ memory_registry::{MemoryNotifyListener, MemoryRegistry},
+ NotifyListener, Registry, ServiceEvent,
+ },
+ StdError,
+};
-// extract service registry metadata from url
-/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
-/// dubboPath = fmt.Sprintf("/%s/%s/%s",
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c),
common.DubboNodes[common.PROVIDER])
+// 从url中获取服务注册的元数据
+// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+// dubboPath = fmt.Sprintf("/%s/%s/%s",
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c),
common.DubboNodes[common.PROVIDER])
pub const REGISTRY_GROUP_KEY: &str = "registry.group";
@@ -49,20 +61,8 @@ impl Watcher for LoggingWatcher {
pub struct ZookeeperRegistry {
root_path: String,
zk_client: Arc<ZooKeeper>,
-
listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as
Registry>::NotifyListener>>>,
-}
-
-pub struct MyNotifyListener {}
-
-impl NotifyListener for MyNotifyListener {
- fn notify(&self, event: dubbo::registry::ServiceEvent) {
- todo!()
- }
-
- fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
- todo!()
- }
+ memory_registry: Arc<Mutex<MemoryRegistry>>,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -90,11 +90,12 @@ impl ZookeeperRegistry {
pub fn new(connect_string: &str) -> ZookeeperRegistry {
let zk_client =
ZooKeeper::connect(connect_string, Duration::from_secs(15),
LoggingWatcher).unwrap();
+ info!("zk server connect string: {}", connect_string);
ZookeeperRegistry {
root_path: "/services".to_string(),
zk_client: Arc::new(zk_client),
-
listeners: RwLock::new(HashMap::new()),
+ memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
}
}
@@ -106,15 +107,16 @@ impl ZookeeperRegistry {
) -> ServiceInstancesChangedListener {
let mut service_names = HashSet::new();
service_names.insert(service_name.clone());
- return ServiceInstancesChangedListener {
+ ServiceInstancesChangedListener {
zk_client: Arc::clone(&self.zk_client),
- path: path,
-
+ path,
service_name: service_name.clone(),
- listener: listener,
- };
+ listener,
+ }
}
+ // metadata /dubbo/mapping designed for application discovery; deprecated
for currently using interface discovery
+ // #[deprecated]
fn get_app_name(&self, service_name: String) -> String {
let res = self
.zk_client
@@ -127,25 +129,155 @@ impl ZookeeperRegistry {
};
s.to_string()
}
+
+ pub fn get_client(&self) -> Arc<ZooKeeper> {
+ self.zk_client.clone()
+ }
+
+ // If the parent node does not exist in the ZooKeeper,
Err(ZkError::NoNode) will be returned.
+ pub fn create_path(
+ &self,
+ path: &str,
+ data: &str,
+ create_mode: CreateMode,
+ ) -> Result<(), StdError> {
+ if self.exists_path(path) {
+ self.zk_client
+ .set_data(path, data.as_bytes().to_vec(), None)
+ .unwrap_or_else(|_| panic!("set data to {} failed.", path));
+ return Ok(());
+ }
+ let zk_result = self.zk_client.create(
+ path,
+ data.as_bytes().to_vec(),
+ Acl::open_unsafe().clone(),
+ create_mode,
+ );
+ match zk_result {
+ Ok(_) => Ok(()),
+ Err(err) => {
+ error!("zk path {} parent not exists.", path);
+ Err(Box::try_from(err).unwrap())
+ }
+ }
+ }
+
+ // For avoiding Err(ZkError::NoNode) when parent node is't exists
+ pub fn create_path_with_parent_check(
+ &self,
+ path: &str,
+ data: &str,
+ create_mode: CreateMode,
+ ) -> Result<(), StdError> {
+ let nodes: Vec<&str> = path.split('/').collect();
+ let mut current: String = String::new();
+ let children = *nodes.last().unwrap();
+ for node_key in nodes {
+ if node_key.is_empty() {
+ continue;
+ };
+ current.push('/');
+ current.push_str(node_key);
+ if !self.exists_path(current.as_str()) {
+ let new_create_mode = match children == node_key {
+ true => create_mode,
+ false => CreateMode::Persistent,
+ };
+ let new_data = match children == node_key {
+ true => data,
+ false => "",
+ };
+ self.create_path(current.as_str(), new_data, new_create_mode)
+ .unwrap();
+ }
+ }
+ Ok(())
+ }
+
+ pub fn delete_path(&self, path: &str) {
+ if self.exists_path(path) {
+ self.get_client().delete(path, None).unwrap()
+ }
+ }
+
+ pub fn exists_path(&self, path: &str) -> bool {
+ self.zk_client.exists(path, false).unwrap().is_some()
+ }
+
+ pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
+ if self.exists_path(path) {
+ let zk_result = self.zk_client.get_data(path, watch);
+ if let Ok(..) = zk_result {
+ Some(String::from_utf8(zk_result.unwrap().0).unwrap())
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ }
+}
+
+impl Default for ZookeeperRegistry {
+ fn default() -> ZookeeperRegistry {
+ let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+ Ok(val) => val,
+ Err(_) => {
+ let default_connect_string = "localhost:2181";
+ info!(
+ "No ZOOKEEPER_SERVERS env value, using {} as default.",
+ default_connect_string
+ );
+ default_connect_string.to_string()
+ }
+ };
+ println!(
+ "using external registry with it's connect string {}",
+ zk_connect_string.as_str()
+ );
+ ZookeeperRegistry::new(zk_connect_string.as_str())
+ }
}
impl Registry for ZookeeperRegistry {
type NotifyListener = MemoryNotifyListener;
fn register(&mut self, url: Url) -> Result<(), StdError> {
- todo!();
+ println!("register url: {}", url);
+ let zk_path = format!(
+ "/{}/{}/{}/{}",
+ DUBBO_KEY,
+ url.service_name,
+ PROVIDERS_KEY,
+ url.encoded_raw_url_string()
+ );
+ self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP,
CreateMode::Ephemeral)?;
+ Ok(())
}
fn unregister(&mut self, url: Url) -> Result<(), StdError> {
- todo!();
+ let zk_path = format!(
+ "/{}/{}/{}/{}",
+ DUBBO_KEY,
+ url.service_name,
+ PROVIDERS_KEY,
+ url.encoded_raw_url_string()
+ );
+ self.delete_path(zk_path.as_str());
+ Ok(())
}
+ // for consumer to find the changes of providers
fn subscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
- let binding = url.get_service_name();
- let service_name = binding.get(0).unwrap();
- let app_name = self.get_app_name(service_name.clone());
- let path = self.root_path.clone() + "/" + &app_name;
- if self.listeners.read().unwrap().get(service_name).is_some() {
+ let service_name = url.get_service_name();
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name,
PROVIDERS_KEY);
+ if self
+ .listeners
+ .read()
+ .unwrap()
+ .get(service_name.as_str())
+ .is_some()
+ {
return Ok(());
}
@@ -156,37 +288,32 @@ impl Registry for ZookeeperRegistry {
.insert(service_name.to_string(), Arc::clone(&arc_listener));
let zk_listener = self.create_listener(
- path.clone(),
+ zk_path.clone(),
service_name.to_string(),
Arc::clone(&arc_listener),
);
- let res = self.zk_client.get_children_w(&path, zk_listener);
- let result: Vec<Url> = res
- .unwrap()
- .iter()
- .map(|node_key| {
- let zk_res = self.zk_client.get_data(
- &(self.root_path.clone() + "/" + &app_name + "/" +
&node_key),
- false,
- );
- let vec_u8 = zk_res.unwrap().0;
- let sstr = std::str::from_utf8(&vec_u8).unwrap();
- let instance: ZkServiceInstance =
serde_json::from_str(sstr).unwrap();
- let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
- instance.get_host(),
- instance.get_port(),
- service_name
- ))
- .unwrap();
- url
- })
- .collect();
-
- info!("notifing {}->{:?}", service_name, result);
+ let zk_changed_paths = self.zk_client.get_children_w(&zk_path,
zk_listener);
+ let result = match zk_changed_paths {
+ Err(err) => {
+ error!("zk subscribe error: {}", err);
+ Vec::new()
+ }
+ Ok(urls) => urls
+ .iter()
+ .map(|node_key| {
+ let provider_url: Url = urlencoding::decode(node_key)
+ .unwrap()
+ .to_string()
+ .as_str()
+ .into();
+ provider_url
+ })
+ .collect(),
+ };
+ info!("notifying {}->{:?}", service_name, result);
arc_listener.notify(ServiceEvent {
- key: service_name.to_string(),
+ key: service_name,
action: String::from("ADD"),
service: result,
});
@@ -201,7 +328,6 @@ impl Registry for ZookeeperRegistry {
pub struct ServiceInstancesChangedListener {
zk_client: Arc<ZooKeeper>,
path: String,
-
service_name: String,
listener: Arc<MemoryNotifyListener>,
}
@@ -213,40 +339,26 @@ impl Watcher for ServiceInstancesChangedListener {
let event_path = path.clone();
let dirs = self
.zk_client
- .get_children(&event_path.clone(), false)
+ .get_children(&event_path, false)
.expect("msg");
let result: Vec<Url> = dirs
.iter()
.map(|node_key| {
- let zk_res = self
- .zk_client
- .get_data(&(event_path.clone() + "/" + node_key),
false);
- let vec_u8 = zk_res.unwrap().0;
- let sstr = std::str::from_utf8(&vec_u8).unwrap();
- let instance: ZkServiceInstance =
serde_json::from_str(sstr).unwrap();
- let url = Url::from_url(&format!(
- "triple://{}:{}/{}",
- instance.get_host(),
- instance.get_port(),
- self.service_name
- ))
- .unwrap();
- url
+ let provider_url: Url = node_key.as_str().into();
+ provider_url
})
.collect();
-
let res = self.zk_client.get_children_w(
&path,
ServiceInstancesChangedListener {
zk_client: Arc::clone(&self.zk_client),
path: path.clone(),
-
service_name: self.service_name.clone(),
listener: Arc::clone(&self.listener),
},
);
- info!("notifing {}->{:?}", self.service_name, result);
+ info!("notify {}->{:?}", self.service_name, result);
self.listener.notify(ServiceEvent {
key: self.service_name.clone(),
action: String::from("ADD"),
@@ -258,48 +370,102 @@ impl Watcher for ServiceInstancesChangedListener {
impl NotifyListener for ServiceInstancesChangedListener {
fn notify(&self, event: ServiceEvent) {
- todo!()
+ self.listener.notify(event);
}
fn notify_all(&self, event: ServiceEvent) {
- todo!()
+ self.listener.notify(event);
}
}
-#[test]
-fn it_works() {
- let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
- let zk_client =
- ZooKeeper::connect(connect_string, Duration::from_secs(15),
LoggingWatcher).unwrap();
- let watcher = Arc::new(Some(TestZkWatcher {
- watcher: Arc::new(None),
- }));
- watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
- let x = watcher.as_ref().expect("");
- zk_client.get_children_w("/test", x);
- zk_client.delete("/test/a", None);
- zk_client.delete("/test/b", None);
- let zk_res = zk_client.create(
- "/test/a",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let zk_res = zk_client.create(
- "/test/b",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- zk_client.close();
+impl ClusterRegistryIntegration for ZookeeperRegistry {
+ fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
+ todo!()
+ }
}
-struct TestZkWatcher {
- pub watcher: Arc<Option<TestZkWatcher>>,
-}
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
-impl Watcher for TestZkWatcher {
- fn handle(&self, event: WatchedEvent) {
- println!("event: {:?}", event);
+ use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
+
+ use crate::zookeeper_registry::ZookeeperRegistry;
+
+ struct TestZkWatcher {
+ pub watcher: Arc<Option<TestZkWatcher>>,
+ }
+
+ impl Watcher for TestZkWatcher {
+ fn handle(&self, event: WatchedEvent) {
+ println!("event: {:?}", event);
+ }
+ }
+
+ #[test]
+ fn zk_read_write_watcher() {
+ //
https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
+ // using ENV to set zookeeper server urls
+ let zkr = ZookeeperRegistry::default();
+ let zk_client = zkr.get_client();
+ let watcher = TestZkWatcher {
+ watcher: Arc::new(None),
+ };
+ if zk_client.exists("/test", true).is_err() {
+ zk_client
+ .create(
+ "/test",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ )
+ .unwrap();
+ }
+ let zk_res = zk_client.create(
+ "/test",
+ "hello".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let result = zk_client.get_children_w("/test", watcher);
+ assert!(result.is_ok());
+ if zk_client.exists("/test/a", true).is_err() {
+ zk_client.delete("/test/a", None).unwrap();
+ }
+ if zk_client.exists("/test/a", true).is_err() {
+ zk_client.delete("/test/b", None).unwrap();
+ }
+ let zk_res = zk_client.create(
+ "/test/a",
+ "hello".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let zk_res = zk_client.create(
+ "/test/b",
+ "world".into(),
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let test_a_result = zk_client.get_data("/test", true);
+ assert!(test_a_result.is_ok());
+ let vec1 = test_a_result.unwrap().0;
+ // data in /test should equals to "hello"
+ assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
+ zk_client.close().unwrap()
+ }
+
+ #[test]
+ fn create_path_with_parent_check() {
+ let zkr = ZookeeperRegistry::default();
+ let path = "/du1bbo/test11111";
+ let data = "hello";
+ // creating a child on a not exists parent, throw a NoNode error.
+ // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
+ // assert!(result.is_err());
+ let create_with_parent_check_result =
+ zkr.create_path_with_parent_check(path, data,
CreateMode::Ephemeral);
+ assert!(create_with_parent_check_result.is_ok());
+ assert_eq!(data, zkr.get_data(path, false).unwrap());
}
}
diff --git a/scripts/ci-check.sh b/scripts/ci-check.sh
new file mode 100755
index 0000000..5ffd46d
--- /dev/null
+++ b/scripts/ci-check.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#/*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+cargo fmt --all -- --check
+# use stable channel
+cargo check
+target/debug/greeter-server && target/debug/greeter-client && sleep 3s ;