This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b98d724  Rft:  Merge service_discovery branch to main (#114)
b98d724 is described below

commit b98d724ef4c130f6321f67c6c707e4766fe69660
Author: 墨舟 <[email protected]>
AuthorDate: Mon Feb 20 11:42:32 2023 +0800

    Rft:  Merge service_discovery branch to main (#114)
    
    * implement service discovery
    
    * update github action to run ci (#63)
    
    * update github action to run ci
    
    * update github action, copy from main branch
    
    * load balance and service registry closed loop (#105)
    
    * feat(cluster): loadbalance types
    
    * feat(rpc): types alias for cluster invoker
    
    * feat(cluster): integration
    
    * feat(cluster): integration with examples
    
    * feat(cluster.loadbalance): greeter example with default random 
loadbalance passed
    
    * feat(cluster.loadbalance): completing roundrobin arithmetic and fixing 
compile warns.
    
    * typo
    
    * fix compile warns
    
    * fix rustfmt check fails.
    
    * fix cargo check fails(due to the use of nightly channel locally).
    
    * fix: default yaml config parse failed in ci.
    
    * ci actions zk test
    
    * feat(registry): zk support
    
    * feat(registry): zk support, connected to zk
    
    * feat(registry): provider.services  key as service name
    
    * feat(registry): serviceKey and configuration files, aligned to dubbo 
ecology
    
    * feat(commons): tested Url impl
    
    * feat(commons): tested Url impl
    
    * feat(zk): interface service discovery
    
    * feat(zk): create_path_with_parent_check
    
    * feat(zk): export bug fixed.
    
    * feat: merged branch main to service_discovery
    
    * cargo fmt
    
    * Rft: merge from main branch.
    
    ---------
    
    Co-authored-by: luyanbo <[email protected]>
    Co-authored-by: Ken Liu <[email protected]>
    Co-authored-by: Yang Yang <[email protected]>
---
 .github/workflows/github-actions.yml               |  12 +-
 Cargo.toml                                         |   2 +-
 config/src/config.rs                               |  68 ++--
 config/src/lib.rs                                  |   1 +
 config/src/protocol.rs                             |  54 +--
 config/src/provider.rs                             |   3 +-
 config/src/{lib.rs => registry.rs}                 |  21 +-
 config/src/service.rs                              |  33 +-
 dubbo-build/src/client.rs                          |  21 +-
 dubbo/Cargo.toml                                   |   2 +
 dubbo/src/cluster/directory.rs                     |   8 +-
 .../consts.rs => cluster/loadbalance/impls/mod.rs} |   6 +-
 dubbo/src/cluster/loadbalance/impls/random.rs      |  59 +++
 dubbo/src/cluster/loadbalance/impls/roundrobin.rs  |  85 +++++
 .../lib.rs => dubbo/src/cluster/loadbalance/mod.rs |  34 +-
 .../src/cluster/loadbalance/types.rs               |  31 +-
 dubbo/src/cluster/mod.rs                           |  17 +-
 dubbo/src/cluster/support/cluster_invoker.rs       | 147 ++++++++
 .../{common/consts.rs => cluster/support/mod.rs}   |   6 +-
 dubbo/src/codegen.rs                               |   5 +-
 dubbo/src/common/consts.rs                         |  12 +
 dubbo/src/common/url.rs                            | 207 ++++++++---
 dubbo/src/framework.rs                             | 143 ++++----
 dubbo/src/invocation.rs                            |   8 +-
 dubbo/src/protocol/mod.rs                          |   9 +-
 dubbo/src/protocol/triple/triple_invoker.rs        |  29 +-
 dubbo/src/protocol/triple/triple_protocol.rs       |   8 +-
 .../lib.rs => dubbo/src/registry/integration.rs    |  19 +-
 dubbo/src/registry/memory_registry.rs              |  15 +-
 dubbo/src/registry/mod.rs                          |  18 +-
 dubbo/src/registry/protocol.rs                     |  42 ++-
 dubbo/src/registry/types.rs                        |  91 +++++
 dubbo/src/triple/client/builder.rs                 |  21 +-
 dubbo/src/triple/client/triple.rs                  | 128 ++++---
 dubbo/src/triple/compression.rs                    |   4 +-
 dubbo/src/triple/server/builder.rs                 |   8 +-
 dubbo/src/triple/transport/connection.rs           |   2 +
 examples/echo/src/echo/server.rs                   |  36 +-
 examples/echo/src/generated/grpc.examples.echo.rs  |  12 +-
 examples/greeter/Cargo.toml                        |   1 -
 examples/greeter/dubbo.yaml                        |  34 +-
 examples/greeter/src/greeter/client.rs             |   7 +-
 examples/greeter/src/greeter/server.rs             |  51 +--
 registry-nacos/src/nacos_registry.rs               |  18 +-
 registry-nacos/src/utils/mod.rs                    |  14 +-
 registry-zookeeper/Cargo.toml                      |   3 +-
 registry-zookeeper/src/zookeeper_registry.rs       | 400 +++++++++++++++------
 scripts/ci-check.sh                                |  23 ++
 48 files changed, 1383 insertions(+), 595 deletions(-)

diff --git a/.github/workflows/github-actions.yml 
b/.github/workflows/github-actions.yml
index 9e53db5..e8881e0 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -52,6 +52,13 @@ jobs:
   example-greeter:
     name: example/greeter
     runs-on: ubuntu-latest
+    services:
+      zoo1:
+        image: zookeeper:3.8
+        ports:
+          - 2181:2181
+        env:
+          ZOO_MY_ID: 1
     steps:
       - uses: actions/checkout@main
       - uses: actions-rs/toolchain@v1
@@ -83,6 +90,9 @@ jobs:
       - name: example greeter
         run: |
           ../../target/debug/greeter-server &
-          sleep 1s ;
+          sleep 3 
           ../../target/debug/greeter-client
+        env:
+          ZOOKEEPER_SERVERS: 127.0.0.1:2181
+          DUBBO_CONFIG_PATH: ./dubbo.yaml
         working-directory: examples/greeter
diff --git a/Cargo.toml b/Cargo.toml
index 785e903..d4404f2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,5 +10,5 @@ members = [
   "dubbo",
   "examples/echo",
   "examples/greeter",
-  "dubbo-build",
+  "dubbo-build"
 ]
diff --git a/config/src/config.rs b/config/src/config.rs
index 88bf0de..ed3a714 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,33 +17,33 @@
 
 use std::{collections::HashMap, env, fs};
 
+use crate::{protocol::Protocol, registry::RegistryConfig};
 use once_cell::sync::OnceCell;
 use serde::{Deserialize, Serialize};
+use tracing::trace;
 
 use super::{protocol::ProtocolConfig, provider::ProviderConfig, 
service::ServiceConfig};
 
 pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
 
 pub static GLOBAL_ROOT_CONFIG: OnceCell<RootConfig> = OnceCell::new();
+pub const DUBBO_CONFIG_PREFIX: &str = "dubbo";
 
 /// used to storage all structed config, from some source: cmd, file..;
 /// Impl Config trait, business init by read Config trait
 #[allow(dead_code)]
 #[derive(Debug, Default, Serialize, Deserialize, Clone)]
 pub struct RootConfig {
-    pub name: String,
-
     #[serde(default)]
-    pub service: HashMap<String, ServiceConfig>,
-    pub protocols: HashMap<String, ProtocolConfig>,
+    pub protocols: ProtocolConfig,
 
     #[serde(default)]
-    pub registries: HashMap<String, String>,
+    pub provider: ProviderConfig,
 
     #[serde(default)]
-    pub provider: ProviderConfig,
+    pub registries: HashMap<String, RegistryConfig>,
 
-    #[serde(skip_serializing, skip_deserializing)]
+    #[serde(default)]
     pub data: HashMap<String, String>,
 }
 
@@ -59,8 +59,6 @@ pub fn get_global_config() -> &'static RootConfig {
 impl RootConfig {
     pub fn new() -> Self {
         Self {
-            name: "dubbo".to_string(),
-            service: HashMap::new(),
             protocols: HashMap::new(),
             registries: HashMap::new(),
             provider: ProviderConfig::new(),
@@ -86,12 +84,11 @@ impl RootConfig {
 
         tracing::info!("current path: {:?}", env::current_dir());
         let data = fs::read(config_path)?;
-        let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap();
+        trace!("config data: {:?}", String::from_utf8(data.clone()));
+        let conf: HashMap<String, RootConfig> = 
serde_yaml::from_slice(&data).unwrap();
+        let root_config: RootConfig = 
conf.get(DUBBO_CONFIG_PREFIX).unwrap().clone();
         tracing::debug!("origin config: {:?}", conf);
-        for (name, svc) in conf.service.iter_mut() {
-            svc.name = name.to_string();
-        }
-        Ok(conf)
+        Ok(root_config)
     }
 
     pub fn test_config(&mut self) {
@@ -101,53 +98,30 @@ impl RootConfig {
 
         let service_config = ServiceConfig::default()
             .group("test".to_string())
-            .serializer("json".to_string())
             .version("1.0.0".to_string())
-            .protocol_names("triple".to_string())
-            // Currently, the hello_echo.rs doesn't support the url which like
-            // `{protocol}/{service config name}/{service name}(e.g. 
triple://0.0.0.0:8888/{service config name}/grpc.examples.echo.Echo).
-            // So we comment this line.
-            // .name("grpc.examples.echo.Echo".to_strdding())
-            .registry("zookeeper".to_string());
-
-        let triple_config = ProtocolConfig::default()
-            .name("triple".to_string())
-            .ip("0.0.0.0".to_string())
-            .port("8888".to_string())
-            .listener("tcp".to_string());
-
-        let service_config = 
service_config.add_protocol_configs(triple_config);
-        self.service
+            .protocol("triple".to_string())
+            .interface("grpc.examples.echo.Echo".to_string());
+
+        self.provider
+            .services
             .insert("grpc.examples.echo.Echo".to_string(), service_config);
-        self.service.insert(
+        self.provider.services.insert(
             "helloworld.Greeter".to_string(),
             ServiceConfig::default()
                 .group("test".to_string())
-                .serializer("json".to_string())
                 .version("1.0.0".to_string())
-                // .name("helloworld.Greeter".to_string())
-                .protocol_names("triple".to_string())
-                .registry("zookeeper".to_string()),
+                .interface("helloworld.Greeter".to_string())
+                .protocol("triple".to_string()),
         );
         self.protocols.insert(
             "triple".to_string(),
-            ProtocolConfig::default()
+            Protocol::default()
                 .name("triple".to_string())
                 .ip("0.0.0.0".to_string())
-                .port("8889".to_string())
-                .listener("tcp".to_string()),
+                .port("8889".to_string()),
         );
 
-        provider.services = self.service.clone();
         self.provider = provider.clone();
-
-        let mut registries = HashMap::new();
-        registries.insert(
-            "zookeeper".to_string(),
-            "zookeeper://localhost:2181".to_string(),
-        );
-        self.registries = registries;
-
         println!("provider config: {:?}", provider);
         // 通过环境变量读取某个文件。加在到内存中
         self.data.insert(
diff --git a/config/src/lib.rs b/config/src/lib.rs
index 70a33cd..e56d933 100644
--- a/config/src/lib.rs
+++ b/config/src/lib.rs
@@ -18,6 +18,7 @@
 pub mod config;
 pub mod protocol;
 pub mod provider;
+pub mod registry;
 pub mod service;
 
 pub use config::*;
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 632e912..cdc357a 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -19,18 +19,26 @@ use std::collections::HashMap;
 
 use serde::{Deserialize, Serialize};
 
+pub const DEFAULT_PROTOCOL: &str = "triple";
+
 #[derive(Default, Debug, Clone, Serialize, Deserialize)]
-pub struct ProtocolConfig {
+pub struct Protocol {
     pub ip: String,
     pub port: String,
     pub name: String,
-    pub listener: String,
 
     #[serde(skip_serializing, skip_deserializing)]
     pub params: HashMap<String, String>,
 }
 
-impl ProtocolConfig {
+pub type ProtocolConfig = HashMap<String, Protocol>;
+
+pub trait ProtocolRetrieve {
+    fn get_protocol(&self, protocol_key: &str) -> Option<Protocol>;
+    fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol;
+}
+
+impl Protocol {
     pub fn name(self, name: String) -> Self {
         Self { name, ..self }
     }
@@ -43,30 +51,36 @@ impl ProtocolConfig {
         Self { port, ..self }
     }
 
-    pub fn listener(self, listener: String) -> Self {
-        Self { listener, ..self }
-    }
-
     pub fn params(self, params: HashMap<String, String>) -> Self {
         Self { params, ..self }
     }
 
-    pub fn add_param(mut self, key: String, value: String) -> Self {
-        self.params.insert(key, value);
-        self
+    pub fn to_url(self) -> String {
+        format!("{}://{}:{}", self.name, self.ip, self.port)
     }
+}
 
-    pub fn to_url(&self) -> String {
-        let mut params_vec: Vec<String> = Vec::new();
-        for (k, v) in self.params.iter() {
-            // let tmp = format!("{}={}", k, v);
-            params_vec.push(format!("{}={}", k, v));
+impl ProtocolRetrieve for ProtocolConfig {
+    fn get_protocol(&self, protocol_key: &str) -> Option<Protocol> {
+        let result = self.get(protocol_key);
+        if let Some(..) = result {
+            Some(result.unwrap().clone())
+        } else {
+            None
         }
-        let param = params_vec.join("&");
+    }
 
-        format!(
-            "{}://{}:{}?listener={}&{}",
-            self.name, self.ip, self.port, self.listener, param
-        )
+    fn get_protocol_or_default(&self, protocol_key: &str) -> Protocol {
+        let result = self.get_protocol(protocol_key);
+        if let Some(..) = result {
+            result.unwrap().clone()
+        } else {
+            let result = self.get_protocol(protocol_key);
+            if result.is_none() {
+                panic!("default triple protocol dose not defined.")
+            } else {
+                result.unwrap()
+            }
+        }
     }
 }
diff --git a/config/src/provider.rs b/config/src/provider.rs
index 120dc84..ccb9cf8 100644
--- a/config/src/provider.rs
+++ b/config/src/provider.rs
@@ -25,10 +25,9 @@ use super::service::ServiceConfig;
 pub struct ProviderConfig {
     #[serde(default)]
     pub registry_ids: Vec<String>,
-
     #[serde(default)]
     pub protocol_ids: Vec<String>,
-
+    #[serde(default)]
     pub services: HashMap<String, ServiceConfig>,
 }
 
diff --git a/config/src/lib.rs b/config/src/registry.rs
similarity index 78%
copy from config/src/lib.rs
copy to config/src/registry.rs
index 70a33cd..c2348b0 100644
--- a/config/src/lib.rs
+++ b/config/src/registry.rs
@@ -14,19 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use serde::{Deserialize, Serialize};
 
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
-
-pub use config::*;
-
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn it_works() {
-        let result = 2 + 2;
-        assert_eq!(result, 4);
-    }
+#[derive(Debug, Default, Serialize, Deserialize, Clone)]
+pub struct RegistryConfig {
+    #[serde(default)]
+    pub protocol: String,
+    #[serde(default)]
+    pub address: String,
 }
diff --git a/config/src/service.rs b/config/src/service.rs
index a876c0a..1f85a92 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -15,30 +15,19 @@
  * limitations under the License.
  */
 
-use std::collections::HashMap;
-
 use serde::{Deserialize, Serialize};
 
-use super::protocol::ProtocolConfig;
-
 #[derive(Debug, Default, Serialize, Deserialize, Clone)]
 pub struct ServiceConfig {
     pub version: String,
     pub group: String,
-
-    #[serde(skip_serializing, skip_deserializing)]
-    pub name: String,
     pub protocol: String,
-    pub registry: String,
-    pub serializer: String,
-
-    #[serde(default)]
-    pub protocol_configs: HashMap<String, ProtocolConfig>,
+    pub interface: String,
 }
 
 impl ServiceConfig {
-    pub fn name(self, name: String) -> Self {
-        Self { name, ..self }
+    pub fn interface(self, interface: String) -> Self {
+        Self { interface, ..self }
     }
 
     pub fn version(self, version: String) -> Self {
@@ -49,24 +38,10 @@ impl ServiceConfig {
         Self { group, ..self }
     }
 
-    pub fn protocol_names(self, protocol: String) -> Self {
+    pub fn protocol(self, protocol: String) -> Self {
         Self { protocol, ..self }
     }
 
-    pub fn serializer(self, serializer: String) -> Self {
-        Self { serializer, ..self }
-    }
-
-    pub fn registry(self, registry: String) -> Self {
-        Self { registry, ..self }
-    }
-
-    pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) -> 
Self {
-        self.protocol_configs
-            .insert(protocol_config.name.clone(), protocol_config);
-        Self { ..self }
-    }
-
     // pub fn get_url(&self) -> Vec<Url> {
     //     let mut urls = Vec::new();
     //     for (_, conf) in self.protocol_configs.iter() {
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 05e8e89..1e1c9fb 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -90,15 +90,10 @@ pub fn generate<T: Service>(
                     }
                 }
 
-                // pub fn with_filter<F>(self, filter: F) -> 
#service_ident<FilterService<T, F>>
-                // where
-                //     F: Filter,
-                // {
-                //     let inner = self.inner.with_filter(filter);
-                //     #service_ident {
-                //         inner,
-                //     }
-                // }
+                pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self 
{
+                    self.inner = self.inner.with_cluster(invoker);
+                    self
+                }
 
                 #methods
 
@@ -189,7 +184,7 @@ fn generate_unary<T: Method>(
         ) -> Result<Response<#response>, dubbo::status::Status> {
            let codec = #codec_name::<#request, #response>::default();
            let invocation = RpcInvocation::default()
-            .with_servie_unique_name(String::from(#service_unique_name))
+            .with_service_unique_name(String::from(#service_unique_name))
             .with_method_name(String::from(#method_name));
            let path = http::uri::PathAndQuery::from_static(#path);
            self.inner.unary(
@@ -223,7 +218,7 @@ fn generate_server_streaming<T: Method>(
 
             let codec = #codec_name::<#request, #response>::default();
             let invocation = RpcInvocation::default()
-             .with_servie_unique_name(String::from(#service_unique_name))
+             .with_service_unique_name(String::from(#service_unique_name))
              .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
             self.inner.server_streaming(
@@ -256,7 +251,7 @@ fn generate_client_streaming<T: Method>(
         ) -> Result<Response<#response>, dubbo::status::Status> {
             let codec = #codec_name::<#request, #response>::default();
             let invocation = RpcInvocation::default()
-             .with_servie_unique_name(String::from(#service_unique_name))
+             .with_service_unique_name(String::from(#service_unique_name))
              .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
             self.inner.client_streaming(
@@ -289,7 +284,7 @@ fn generate_streaming<T: Method>(
         ) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
             let codec = #codec_name::<#request, #response>::default();
             let invocation = RpcInvocation::default()
-             .with_servie_unique_name(String::from(#service_unique_name))
+             .with_service_unique_name(String::from(#service_unique_name))
              .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
             self.inner.bidi_streaming(
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index e319259..5369852 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -34,6 +34,8 @@ axum = "0.5.9"
 async-stream = "0.3"
 flate2 = "1.0"
 aws-smithy-http = "0.54.1"
+itertools = "0.10.1"
+urlencoding = "2.1.2"
 
 dubbo-config = {path = "../config", version = "0.2.0"}
 
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index d1b5373..02946a9 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -32,7 +32,7 @@ use crate::{
 ///
 /// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
 pub trait Directory: Debug + DirectoryClone {
-    fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
 }
 
 pub trait DirectoryClone {
@@ -77,9 +77,9 @@ impl StaticDirectory {
 }
 
 impl Directory for StaticDirectory {
-    fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
         let url = Url::from_url(&format!(
-            "triple://{}:{}/{}",
+            "tri://{}:{}/{}",
             self.uri.host().unwrap(),
             self.uri.port().unwrap(),
             invocation.get_target_service_unique_name(),
@@ -121,7 +121,7 @@ impl DirectoryClone for RegistryDirectory {
 }
 
 impl Directory for RegistryDirectory {
-    fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+    fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
         let service_name = invocation.get_target_service_unique_name();
 
         let url = Url::from_url(&format!(
diff --git a/dubbo/src/common/consts.rs 
b/dubbo/src/cluster/loadbalance/impls/mod.rs
similarity index 85%
copy from dubbo/src/common/consts.rs
copy to dubbo/src/cluster/loadbalance/impls/mod.rs
index c05c2c7..5a84af8 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/cluster/loadbalance/impls/mod.rs
@@ -14,7 +14,5 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
-pub const PROTOCOL: &str = "protocol";
-pub const REGISTRY: &str = "registry";
+pub mod random;
+pub mod roundrobin;
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs 
b/dubbo/src/cluster/loadbalance/impls/random.rs
new file mode 100644
index 0000000..a5ca7df
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/random.rs
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+    fmt::{Debug, Formatter},
+    sync::Arc,
+};
+
+use crate::{
+    cluster::loadbalance::types::{LoadBalance, Metadata},
+    codegen::RpcInvocation,
+    common::url::Url,
+};
+
+pub struct RandomLoadBalance {
+    pub metadata: Metadata,
+}
+
+impl Default for RandomLoadBalance {
+    fn default() -> Self {
+        RandomLoadBalance {
+            metadata: Metadata::new("random"),
+        }
+    }
+}
+
+impl Debug for RandomLoadBalance {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RandomLoadBalance")
+    }
+}
+
+impl LoadBalance for RandomLoadBalance {
+    fn select(
+        &self,
+        invokers: Arc<Vec<Url>>,
+        _url: Option<Url>,
+        _invocation: Arc<RpcInvocation>,
+    ) -> Option<Url> {
+        if invokers.is_empty() {
+            return None;
+        }
+        let index = rand::random::<usize>() % invokers.len();
+        Some(invokers[index].clone())
+    }
+}
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs 
b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
new file mode 100644
index 0000000..cd951bb
--- /dev/null
+++ b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{
+    collections::HashMap,
+    fmt::{Debug, Formatter},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, RwLock,
+    },
+};
+
+use crate::{
+    cluster::loadbalance::types::{LoadBalance, Metadata},
+    codegen::RpcInvocation,
+    common::url::Url,
+};
+
+pub struct RoundRobinLoadBalance {
+    pub metadata: Metadata,
+    pub counter_map: RwLock<HashMap<String, AtomicUsize>>,
+}
+
+impl Default for RoundRobinLoadBalance {
+    fn default() -> Self {
+        RoundRobinLoadBalance {
+            metadata: Metadata::new("roundrobin"),
+            counter_map: RwLock::new(HashMap::new()),
+        }
+    }
+}
+
+impl Debug for RoundRobinLoadBalance {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RoundRobinLoadBalance")
+    }
+}
+
+impl RoundRobinLoadBalance {
+    fn guarantee_counter_key(&self, key: &str) {
+        let contained = self.counter_map.try_read().unwrap().contains_key(key);
+        if !contained {
+            self.counter_map
+                .try_write()
+                .unwrap()
+                .insert(key.to_string(), AtomicUsize::new(0));
+        }
+    }
+}
+
+impl LoadBalance for RoundRobinLoadBalance {
+    fn select(
+        &self,
+        invokers: Arc<Vec<Url>>,
+        _url: Option<Url>,
+        invocation: Arc<RpcInvocation>,
+    ) -> Option<Url> {
+        if invokers.is_empty() {
+            return None;
+        }
+        let fingerprint = invocation.unique_fingerprint();
+        self.guarantee_counter_key(fingerprint.as_str());
+        let index = self
+            .counter_map
+            .try_read()
+            .unwrap()
+            .get(fingerprint.as_str())?
+            .fetch_add(1, Ordering::SeqCst)
+            % invokers.len();
+        Some(invokers[index].clone())
+    }
+}
diff --git a/config/src/lib.rs b/dubbo/src/cluster/loadbalance/mod.rs
similarity index 51%
copy from config/src/lib.rs
copy to dubbo/src/cluster/loadbalance/mod.rs
index 70a33cd..1d04f7f 100644
--- a/config/src/lib.rs
+++ b/dubbo/src/cluster/loadbalance/mod.rs
@@ -14,19 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use std::collections::HashMap;
 
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
+use lazy_static::lazy_static;
 
-pub use config::*;
+use crate::cluster::loadbalance::{
+    impls::{random::RandomLoadBalance, roundrobin::RoundRobinLoadBalance},
+    types::BoxLoadBalance,
+};
 
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn it_works() {
-        let result = 2 + 2;
-        assert_eq!(result, 4);
-    }
+pub mod impls;
+pub mod types;
+
+lazy_static! {
+    pub static ref LOAD_BALANCE_EXTENSIONS: HashMap<String, BoxLoadBalance> =
+        init_loadbalance_extensions();
+}
+
+fn init_loadbalance_extensions() -> HashMap<String, BoxLoadBalance> {
+    let mut loadbalance_map: HashMap<String, BoxLoadBalance> = HashMap::new();
+    loadbalance_map.insert("random".to_string(), 
Box::new(RandomLoadBalance::default()));
+    loadbalance_map.insert(
+        "roundrobin".to_string(),
+        Box::new(RoundRobinLoadBalance::default()),
+    );
+    loadbalance_map
 }
diff --git a/config/src/lib.rs b/dubbo/src/cluster/loadbalance/types.rs
similarity index 63%
copy from config/src/lib.rs
copy to dubbo/src/cluster/loadbalance/types.rs
index 70a33cd..ac31176 100644
--- a/config/src/lib.rs
+++ b/dubbo/src/cluster/loadbalance/types.rs
@@ -15,18 +15,27 @@
  * limitations under the License.
  */
 
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
+use std::{fmt::Debug, sync::Arc};
 
-pub use config::*;
+use crate::{codegen::RpcInvocation, common::url::Url};
 
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn it_works() {
-        let result = 2 + 2;
-        assert_eq!(result, 4);
+pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
+
+pub trait LoadBalance: Debug {
+    fn select(
+        &self,
+        invokers: Arc<Vec<Url>>,
+        url: Option<Url>,
+        invocation: Arc<RpcInvocation>,
+    ) -> Option<Url>;
+}
+
+pub struct Metadata {
+    pub name: &'static str,
+}
+
+impl Metadata {
+    pub fn new(name: &'static str) -> Self {
+        Metadata { name }
     }
 }
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 2221afc..b6d8a7c 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -20,13 +20,11 @@ use std::{sync::Arc, task::Poll};
 use aws_smithy_http::body::SdkBody;
 use tower_service::Service;
 
-use crate::{
-    common::url::Url,
-    empty_body,
-    protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker},
-};
+use crate::{empty_body, protocol::BoxInvoker};
 
 pub mod directory;
+pub mod loadbalance;
+pub mod support;
 
 pub trait Directory {
     fn list(&self, meta: String) -> Vec<BoxInvoker>;
@@ -90,10 +88,11 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
 pub struct MockDirectory {}
 
 impl Directory for MockDirectory {
-    fn list(&self, meta: String) -> Vec<BoxInvoker> {
-        tracing::info!("MockDirectory: {}", meta);
-        let u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
-        vec![Box::new(TripleInvoker::new(u))]
+    fn list(&self, _meta: String) -> Vec<BoxInvoker> {
+        // tracing::info!("MockDirectory: {}", meta);
+        // let u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+        // vec![Box::new(TripleInvoker::new(u))]
+        todo!()
     }
 
     fn is_empty(&self) -> bool {
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs 
b/dubbo/src/cluster/support/cluster_invoker.rs
new file mode 100644
index 0000000..e00b849
--- /dev/null
+++ b/dubbo/src/cluster/support/cluster_invoker.rs
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use aws_smithy_http::body::SdkBody;
+use std::{str::FromStr, sync::Arc};
+
+use http::{uri::PathAndQuery, Request};
+
+use crate::{
+    cluster::{
+        loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS},
+        support::DEFAULT_LOADBALANCE,
+    },
+    codegen::{Directory, RegistryDirectory, TripleClient},
+    common::url::Url,
+    invocation::RpcInvocation,
+};
+
+#[derive(Debug, Clone)]
+pub struct ClusterInvoker {
+    directory: Arc<RegistryDirectory>,
+    destroyed: bool,
+}
+
+pub trait ClusterInvokerSelector {
+    /// Select a invoker using loadbalance policy.
+    fn select(
+        &self,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+        excluded: Arc<Vec<Url>>,
+    ) -> Option<Url>;
+
+    fn do_select(
+        &self,
+        loadbalance_key: Option<&str>,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+    ) -> Option<Url>;
+}
+
+pub trait ClusterRequestBuilder {
+    fn build_req(
+        &self,
+        triple_client: &mut TripleClient,
+        path: http::uri::PathAndQuery,
+        invocation: Arc<RpcInvocation>,
+        body: SdkBody,
+    ) -> http::Request<SdkBody>;
+}
+
+impl ClusterInvoker {
+    pub fn with_directory(registry_directory: RegistryDirectory) -> Self {
+        ClusterInvoker {
+            directory: Arc::new(registry_directory),
+            destroyed: false,
+        }
+    }
+
+    pub fn directory(&self) -> Arc<RegistryDirectory> {
+        self.directory.clone()
+    }
+
+    pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance {
+        if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) {
+            LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap()
+        } else {
+            println!(
+                "loadbalance {} not found, use default loadbalance {}",
+                loadbalance_key, DEFAULT_LOADBALANCE
+            );
+            LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap()
+        }
+    }
+
+    pub fn is_available(&self, invocation: Arc<RpcInvocation>) -> bool {
+        !self.destroyed() && !self.directory.list(invocation).is_empty()
+    }
+
+    pub fn destroyed(&self) -> bool {
+        self.destroyed
+    }
+}
+
+impl ClusterInvokerSelector for ClusterInvoker {
+    fn select(
+        &self,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+        _excluded: Arc<Vec<Url>>,
+    ) -> Option<Url> {
+        if invokers.is_empty() {
+            return None;
+        }
+        let instance_count = invokers.len();
+        return if instance_count == 1 {
+            Some(invokers.as_ref().first()?.clone())
+        } else {
+            let loadbalance = Some(DEFAULT_LOADBALANCE);
+            self.do_select(loadbalance, invocation, invokers)
+        };
+    }
+
+    /// picking instance invoker url from registry directory
+    fn do_select(
+        &self,
+        loadbalance_key: Option<&str>,
+        invocation: Arc<RpcInvocation>,
+        invokers: Arc<Vec<Url>>,
+    ) -> Option<Url> {
+        let loadbalance = 
self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE));
+        loadbalance.select(invokers, None, invocation)
+    }
+}
+
+impl ClusterRequestBuilder for ClusterInvoker {
+    fn build_req(
+        &self,
+        triple_client: &mut TripleClient,
+        path: PathAndQuery,
+        invocation: Arc<RpcInvocation>,
+        body: SdkBody,
+    ) -> Request<SdkBody> {
+        let invokers = self.directory.list(invocation.clone());
+        let invoker_url = self
+            .select(invocation, Arc::new(invokers), Arc::new(Vec::new()))
+            .expect("no valid provider");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/";, invoker_url.ip, 
invoker_url.port))
+                .unwrap();
+        triple_client.map_request(http_uri, path, body)
+    }
+}
diff --git a/dubbo/src/common/consts.rs b/dubbo/src/cluster/support/mod.rs
similarity index 85%
copy from dubbo/src/common/consts.rs
copy to dubbo/src/cluster/support/mod.rs
index c05c2c7..ae42cc2 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/cluster/support/mod.rs
@@ -15,6 +15,6 @@
  * limitations under the License.
  */
 
-pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
-pub const PROTOCOL: &str = "protocol";
-pub const REGISTRY: &str = "registry";
+pub mod cluster_invoker;
+
+pub const DEFAULT_LOADBALANCE: &str = "random";
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 5d0a273..98d784f 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -27,7 +27,10 @@ pub use hyper::Body as hyperBody;
 pub use tower_service::Service;
 
 pub use super::{
-    cluster::directory::{Directory, RegistryDirectory},
+    cluster::{
+        directory::{Directory, RegistryDirectory},
+        support::cluster_invoker::ClusterInvoker,
+    },
     empty_body,
     invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
     protocol::{triple::triple_invoker::TripleInvoker, Invoker},
diff --git a/dubbo/src/common/consts.rs b/dubbo/src/common/consts.rs
index c05c2c7..17993c8 100644
--- a/dubbo/src/common/consts.rs
+++ b/dubbo/src/common/consts.rs
@@ -18,3 +18,15 @@
 pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
 pub const PROTOCOL: &str = "protocol";
 pub const REGISTRY: &str = "registry";
+
+// URL key
+pub const DUBBO_KEY: &str = "dubbo";
+pub const PROVIDERS_KEY: &str = "providers";
+pub const LOCALHOST_IP: &str = "127.0.0.1";
+pub const METADATA_MAPPING_KEY: &str = "mapping";
+pub const VERSION_KEY: &str = "version";
+pub const GROUP_KEY: &str = "group";
+pub const INTERFACE_KEY: &str = "interface";
+pub const ANYHOST_KEY: &str = "anyhost";
+pub const SIDE_KEY: &str = "side";
+pub const TIMESTAMP_KEY: &str = "timestamp";
diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs
index 29f9e2c..2a36d72 100644
--- a/dubbo/src/common/url.rs
+++ b/dubbo/src/common/url.rs
@@ -15,16 +15,26 @@
  * limitations under the License.
  */
 
-use std::collections::HashMap;
+use std::{
+    collections::HashMap,
+    fmt::{Display, Formatter},
+};
+
+use crate::common::consts::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
+use http::Uri;
 
 #[derive(Debug, Clone, Default, PartialEq)]
 pub struct Url {
-    pub uri: String,
-    pub protocol: String,
+    pub raw_url_string: String,
+    // value of scheme is different to protocol name, eg. triple -> tri://
+    pub scheme: String,
     pub location: String,
     pub ip: String,
     pub port: String,
-    pub service_key: Vec<String>,
+    // serviceKey format in dubbo java and go 
'{group}/{interfaceName}:{version}'
+    pub service_key: String,
+    // same to interfaceName
+    pub service_name: String,
     pub params: HashMap<String, String>,
 }
 
@@ -41,88 +51,189 @@ impl Url {
                 tracing::error!("fail to parse url({}), err: {:?}", url, err);
             })
             .unwrap();
-        let mut u = Self {
-            uri: uri.to_string(),
-            protocol: uri.scheme_str()?.to_string(),
+        let query = uri.path_and_query().unwrap().query();
+        let mut url_inst = Self {
+            raw_url_string: url.to_string(),
+            scheme: uri.scheme_str()?.to_string(),
             ip: uri.authority()?.host().to_string(),
             port: uri.authority()?.port()?.to_string(),
             location: uri.authority()?.to_string(),
-            service_key: uri
-                .path()
-                .trim_start_matches('/')
-                .split(',')
-                .map(|x| x.to_string())
-                .collect::<Vec<_>>(),
-            params: HashMap::new(),
+            service_key: uri.path().trim_start_matches('/').to_string(),
+            service_name: uri.path().trim_start_matches('/').to_string(),
+            params: if let Some(..) = query {
+                Url::decode(query.unwrap())
+            } else {
+                HashMap::new()
+            },
         };
-        if uri.query().is_some() {
-            u.decode(uri.query().unwrap().to_string());
-            u.service_key = u
-                .get_param("service_names".to_string())
-                .unwrap()
-                .split(',')
-                .map(|x| x.to_string())
-                .collect::<Vec<_>>();
-        }
-
-        Some(u)
+        url_inst.renew_raw_url_string();
+        Some(url_inst)
     }
 
-    pub fn get_service_name(&self) -> Vec<String> {
+    pub fn get_service_key(&self) -> String {
         self.service_key.clone()
     }
 
-    pub fn get_param(&self, key: String) -> Option<String> {
-        self.params.get(&key).cloned()
+    pub fn get_service_name(&self) -> String {
+        self.service_name.clone()
     }
 
-    pub fn encode_param(&self) -> String {
+    pub fn get_param(&self, key: &str) -> Option<String> {
+        self.params.get(key).cloned()
+    }
+
+    fn encode_param(&self) -> String {
         let mut params_vec: Vec<String> = Vec::new();
         for (k, v) in self.params.iter() {
             // let tmp = format!("{}={}", k, v);
             params_vec.push(format!("{}={}", k, v));
         }
-        params_vec.join("&")
+        if params_vec.is_empty() {
+            "".to_string()
+        } else {
+            format!("?{}", params_vec.join("&"))
+        }
+    }
+
+    pub fn params_count(&self) -> usize {
+        self.params.len()
     }
 
-    pub fn decode(&mut self, params: String) {
-        let p: Vec<String> = params.split('&').map(|v| 
v.trim().to_string()).collect();
+    fn decode(raw_query_string: &str) -> HashMap<String, String> {
+        let mut params = HashMap::new();
+        let p: Vec<String> = raw_query_string
+            .split('&')
+            .map(|v| v.trim().to_string())
+            .collect();
         for v in p.iter() {
             let values: Vec<String> = v.split('=').map(|v| 
v.trim().to_string()).collect();
             if values.len() != 2 {
                 continue;
             }
-            self.params.insert(values[0].clone(), values[1].clone());
+            params.insert(values[0].clone(), values[1].clone());
         }
+        params
+    }
+
+    pub fn set_param(&mut self, key: &str, value: &str) {
+        self.params.insert(key.to_string(), value.to_string());
+        self.renew_raw_url_string();
+    }
+
+    pub fn raw_url_string(&self) -> String {
+        self.raw_url_string.clone()
+    }
+
+    pub fn encoded_raw_url_string(&self) -> String {
+        urlencoding::encode(self.raw_url_string.as_str()).to_string()
+    }
+
+    fn build_service_key(&self) -> String {
+        format!(
+            "{group}/{interfaceName}:{version}",
+            group = self.get_param(GROUP_KEY).unwrap_or("default".to_string()),
+            interfaceName = 
self.get_param(INTERFACE_KEY).unwrap_or("error".to_string()),
+            version = 
self.get_param(VERSION_KEY).unwrap_or("1.0.0".to_string())
+        )
     }
 
     pub fn to_url(&self) -> String {
-        format!("{}://{}:{}", self.protocol, self.ip, self.port)
+        self.raw_url_string()
+    }
+
+    fn renew_raw_url_string(&mut self) {
+        self.raw_url_string = format!(
+            "{}://{}:{}/{}{}",
+            self.scheme,
+            self.ip,
+            self.port,
+            self.service_name,
+            self.encode_param()
+        );
+        self.service_key = self.build_service_key()
+    }
+
+    // short_url is used for tcp listening
+    pub fn short_url(&self) -> String {
+        format!("{}://{}:{}", self.scheme, self.ip, self.port)
+    }
+}
+
+impl Display for Url {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str(self.raw_url_string().as_str())
+    }
+}
+
+impl Into<Uri> for Url {
+    fn into(self) -> Uri {
+        self.raw_url_string.parse::<Uri>().unwrap()
+    }
+}
+
+impl From<&str> for Url {
+    fn from(url: &str) -> Self {
+        Url::from_url(url).unwrap()
     }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::common::consts::{ANYHOST_KEY, VERSION_KEY};
 
     #[test]
     fn test_from_url() {
-        let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
-        println!("{:?}", u1.unwrap().get_service_name())
+        let mut u1 = 
Url::from_url("tri://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&\
+        
application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&\
+        
environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&\
+        
module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&\
+        
side=provider&timeout=3000&timestamp=1556509797245&version=1.0.0&application=test");
+        assert_eq!(
+            u1.as_ref().unwrap().service_key,
+            "default/com.ikurento.user.UserProvider:1.0.0"
+        );
+        assert_eq!(
+            u1.as_ref()
+                .unwrap()
+                .get_param(ANYHOST_KEY)
+                .unwrap()
+                .as_str(),
+            "true"
+        );
+        assert_eq!(
+            u1.as_ref()
+                .unwrap()
+                .get_param("default.timeout")
+                .unwrap()
+                .as_str(),
+            "10000"
+        );
+        assert_eq!(u1.as_ref().unwrap().scheme, "tri");
+        assert_eq!(u1.as_ref().unwrap().ip, "127.0.0.1");
+        assert_eq!(u1.as_ref().unwrap().port, "20000");
+        assert_eq!(u1.as_ref().unwrap().params_count(), 18);
+        u1.as_mut().unwrap().set_param("key1", "value1");
+        assert_eq!(
+            u1.as_ref().unwrap().get_param("key1").unwrap().as_str(),
+            "value1"
+        );
+        assert_eq!(
+            u1.as_ref()
+                .unwrap()
+                .get_param(VERSION_KEY)
+                .unwrap()
+                .as_str(),
+            "1.0.0"
+        );
     }
 
     #[test]
-    fn test_encode_params() {
-        let mut u = Url::default();
-        u.params.insert("method".to_string(), "GET".to_string());
-        u.params.insert("args".to_string(), "GET".to_string());
-
-        let en = u.encode_param();
-        println!("encode_params: {:?}", en);
-
-        let mut u1 = Url::default();
-        u1.decode(en);
-        println!("decode_params: {:?}", u1);
-        assert_eq!(u1, u);
+    fn test2() {
+        let url: Url = 
"tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter".into();
+        assert_eq!(
+            url.raw_url_string(),
+            "tri://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter"
+        )
     }
 }
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index c8e4a9b..342ef6c 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -15,23 +15,33 @@
  * limitations under the License.
  */
 
-use std::{collections::HashMap, pin::Pin};
+use std::{
+    collections::HashMap,
+    error::Error,
+    pin::Pin,
+    sync::{Arc, Mutex},
+};
 
 use futures::{future, Future};
+use tracing::{debug, info};
 
 use crate::{
     common::url::Url,
     protocol::{BoxExporter, Protocol},
-    registry::protocol::RegistryProtocol,
+    registry::{
+        protocol::RegistryProtocol,
+        types::{Registries, RegistriesOperation},
+        BoxRegistry, Registry,
+    },
 };
-use dubbo_config::{get_global_config, RootConfig};
+use dubbo_config::{get_global_config, protocol::ProtocolRetrieve, RootConfig};
 
 // Invoker是否可以基于hyper写一个通用的
 
 #[derive(Default)]
 pub struct Dubbo {
     protocols: HashMap<String, Vec<Url>>,
-    registries: HashMap<String, Url>,
+    registries: Option<Registries>,
     service_registry: HashMap<String, Vec<Url>>, // registry: Urls
     config: Option<&'static RootConfig>,
 }
@@ -41,7 +51,7 @@ impl Dubbo {
         tracing_subscriber::fmt::init();
         Self {
             protocols: HashMap::new(),
-            registries: HashMap::new(),
+            registries: None,
             service_registry: HashMap::new(),
             config: None,
         }
@@ -52,80 +62,83 @@ impl Dubbo {
         self
     }
 
-    pub fn init(&mut self) {
-        let conf = self.config.get_or_insert_with(get_global_config);
-        tracing::debug!("global conf: {:?}", conf);
+    pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) 
-> Self {
+        if self.registries.is_none() {
+            self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
+        }
+        self.registries
+            .as_ref()
+            .unwrap()
+            .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+        self
+    }
 
-        for (name, url) in conf.registries.iter() {
-            self.registries.insert(
-                name.clone(),
-                Url::from_url(url).expect(&format!("url: {url} parse 
failed.")),
-            );
+    pub fn init(&mut self) -> Result<(), Box<dyn Error>> {
+        if self.config.is_none() {
+            self.config = Some(get_global_config())
         }
 
-        for (service_name, service_config) in conf.provider.services.iter() {
-            let protocol_url = match service_config
-                .protocol_configs
-                .get(&service_config.protocol)
+        let root_config = self.config.as_ref().unwrap();
+        debug!("global conf: {:?}", root_config);
+        // env::set_var("ZOOKEEPER_SERVERS",root_config);
+        for (_, service_config) in root_config.provider.services.iter() {
+            info!("init service name: {}", service_config.interface);
+            let url = if root_config
+                .protocols
+                .contains_key(service_config.protocol.as_str())
             {
-                Some(protocol_url) => protocol_url,
-                None => {
-                    let Some(protocol_url) = 
conf.protocols.get(&service_config.protocol) else {
-                                    tracing::warn!("protocol: {:?} not 
exists", service_config.protocol);
-                                    continue;
-                                };
-                    protocol_url
-                }
+                let protocol = root_config
+                    .protocols
+                    .get_protocol_or_default(service_config.protocol.as_str());
+                let protocol_url =
+                    format!("{}/{}", protocol.to_url(), 
service_config.interface.clone(),);
+                info!("protocol_url: {:?}", protocol_url);
+                Url::from_url(&protocol_url)
+            } else {
+                return Err(format!("protocol {:?} not exists", 
service_config.protocol).into());
             };
-            // let protocol_url = format!(
-            //     "{}/{}/{}",
-            //     &protocol_url.to_url(),
-            //     service_config.name,
-            //     service_name
-            // );
-            // service_names may be multiple
-            let protocol_url = protocol_url
-                .to_owned()
-                .add_param("service_names".to_string(), 
service_name.to_string());
-            let protocol_url = protocol_url.to_url();
-            tracing::info!("url: {}", protocol_url);
-
-            let protocol_url = Url::from_url(&protocol_url)
-                .expect(&format!("protocol url: {protocol_url} parse 
failed."));
-            self.protocols
-                .entry(service_config.name.clone())
-                .and_modify(|urls| urls.push(protocol_url.clone()))
-                .or_insert(vec![protocol_url]);
-
-            tracing::debug!(
-                "service name: {service_name}, service_config: {:?}",
-                service_config
-            );
-            let registry = &service_config.registry;
-            let reg_url = self
-                .registries
-                .get(registry)
-                .expect(&format!("can't find the registry: {registry}"));
-            self.service_registry
-                .entry(service_config.name.clone())
-                .and_modify(|urls| urls.push(reg_url.to_owned()))
-                .or_insert(vec![reg_url.to_owned()]);
+            info!("url: {:?}", url);
+            if url.is_none() {
+                continue;
+            }
+            let u = url.unwrap();
+            if self.protocols.get(&service_config.protocol).is_some() {
+                self.protocols
+                    .get_mut(&service_config.protocol)
+                    .unwrap()
+                    .push(u);
+            } else {
+                self.protocols
+                    .insert(service_config.protocol.clone(), vec![u]);
+            }
         }
+        Ok(())
     }
 
     pub async fn start(&mut self) {
-        self.init();
-
+        self.init().unwrap();
+        info!("starting...");
         // TODO: server registry
-
-        let mem_reg =
-            
Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
+        let mem_reg = Box::new(
+            RegistryProtocol::new()
+                .with_registries(self.registries.as_ref().unwrap().clone())
+                .with_services(self.service_registry.clone()),
+        );
         let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + 
Send>>> = Vec::new();
         for (name, items) in self.protocols.iter() {
             for url in items.iter() {
-                tracing::info!("protocol: {:?}, service url: {:?}", name, url);
+                info!("protocol: {:?}, service url: {:?}", name, url);
                 let exporter = mem_reg.clone().export(url.to_owned());
-                async_vec.push(exporter)
+                async_vec.push(exporter);
+                //TODO multiple registry
+                if self.registries.is_some() {
+                    self.registries
+                        .as_ref()
+                        .unwrap()
+                        .default_registry()
+                        .register(url.clone())
+                        .unwrap();
+                }
             }
         }
 
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 423e9c7..bdd152b 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
+use std::{collections::HashMap, fmt::Debug, str::FromStr};
+
 use futures_core::Stream;
-use std::{collections::HashMap, str::FromStr};
 
 pub struct Request<T> {
     pub message: T,
@@ -202,7 +203,7 @@ pub struct RpcInvocation {
 }
 
 impl RpcInvocation {
-    pub fn with_servie_unique_name(mut self, service_unique_name: String) -> 
Self {
+    pub fn with_service_unique_name(mut self, service_unique_name: String) -> 
Self {
         self.target_service_unique_name = service_unique_name;
         self
     }
@@ -210,6 +211,9 @@ impl RpcInvocation {
         self.method_name = method_name;
         self
     }
+    pub fn unique_fingerprint(&self) -> String {
+        format!("{}#{}", self.target_service_unique_name, self.method_name)
+    }
 }
 
 impl Invocation for RpcInvocation {
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 656da2a..2c8ad8f 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-pub mod server_desc;
-pub mod triple;
-
 use std::{
+    fmt::Debug,
     future::Future,
     task::{Context, Poll},
 };
@@ -29,6 +27,9 @@ use tower_service::Service;
 
 use crate::common::url::Url;
 
+pub mod server_desc;
+pub mod triple;
+
 #[async_trait]
 pub trait Protocol {
     type Invoker;
@@ -42,7 +43,7 @@ pub trait Exporter {
     fn unexport(&self);
 }
 
-pub trait Invoker<ReqBody> {
+pub trait Invoker<ReqBody>: Debug {
     type Response;
 
     type Error;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index df4d2ae..2bcc2d3 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,17 +15,11 @@
  * limitations under the License.
  */
 
-use std::str::FromStr;
-
 use aws_smithy_http::body::SdkBody;
+use std::fmt::{Debug, Formatter};
 use tower_service::Service;
 
-use crate::{
-    common::url::Url,
-    protocol::Invoker,
-    triple::{client::builder::ClientBoxService, 
transport::connection::Connection},
-    utils::boxed::BoxService,
-};
+use crate::{common::url::Url, protocol::Invoker, 
triple::client::builder::ClientBoxService};
 
 pub struct TripleInvoker {
     url: Url,
@@ -33,13 +27,18 @@ pub struct TripleInvoker {
 }
 
 impl TripleInvoker {
-    pub fn new(url: Url) -> TripleInvoker {
-        let uri = http::Uri::from_str(&url.to_url()).unwrap();
-        let conn = Connection::new().with_host(uri);
-        Self {
-            url,
-            conn: BoxService::new(conn),
-        }
+    // pub fn new(url: Url) -> TripleInvoker {
+    //     let uri = http::Uri::from_str(&url.to_url()).unwrap();
+    //     Self {
+    //         url,
+    //         conn: ClientBuilder::from_uri(&uri).build()connect(),
+    //     }
+    // }
+}
+
+impl Debug for TripleInvoker {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str(format!("{:?}", self.url).as_str())
     }
 }
 
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs 
b/dubbo/src/protocol/triple/triple_protocol.rs
index 3577f52..c6ade9b 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -47,7 +47,7 @@ impl TripleProtocol {
 
     pub fn get_server(&self, url: Url) -> Option<TripleServer> {
         self.servers
-            .get(&url.service_key.join(","))
+            .get(&url.service_key)
             .map(|data| data.to_owned())
     }
 }
@@ -61,10 +61,10 @@ impl Protocol for TripleProtocol {
     }
 
     async fn export(mut self, url: Url) -> BoxExporter {
+        // service_key is same to key of TRIPLE_SERVICES
         let server = TripleServer::new();
-        self.servers
-            .insert(url.service_key.join(","), server.clone());
-        server.serve(url).await;
+        self.servers.insert(url.service_key.clone(), server.clone());
+        server.serve(url.short_url().as_str().into()).await;
 
         Box::new(TripleExporter::new())
     }
diff --git a/config/src/lib.rs b/dubbo/src/registry/integration.rs
similarity index 75%
copy from config/src/lib.rs
copy to dubbo/src/registry/integration.rs
index 70a33cd..15b82d0 100644
--- a/config/src/lib.rs
+++ b/dubbo/src/registry/integration.rs
@@ -14,19 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use crate::{cluster::support::cluster_invoker::ClusterInvoker, 
registry::BoxRegistry};
+use std::sync::Arc;
 
-pub mod config;
-pub mod protocol;
-pub mod provider;
-pub mod service;
-
-pub use config::*;
-
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn it_works() {
-        let result = 2 + 2;
-        assert_eq!(result, 4);
-    }
+pub trait ClusterRegistryIntegration {
+    /// get cluster invoker struct
+    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>>;
 }
diff --git a/dubbo/src/registry/memory_registry.rs 
b/dubbo/src/registry/memory_registry.rs
index 29e0ec2..61d3e41 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -16,10 +16,12 @@
  */
 
 #![allow(unused_variables, dead_code, missing_docs)]
+
 use std::{
     collections::HashMap,
     sync::{Arc, RwLock},
 };
+use tracing::debug;
 
 use crate::common::url::Url;
 
@@ -47,9 +49,9 @@ impl MemoryRegistry {
 impl Registry for MemoryRegistry {
     type NotifyListener = MemoryNotifyListener;
 
-    fn register(&mut self, mut url: crate::common::url::Url) -> Result<(), 
crate::StdError> {
+    fn register(&mut self, mut url: Url) -> Result<(), crate::StdError> {
         // define provider label: ${registry.group}/${service_name}/provider
-        let registry_group = match 
url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+        let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
             Some(key) => key,
             None => "dubbo".to_string(),
         };
@@ -57,20 +59,20 @@ impl Registry for MemoryRegistry {
         let dubbo_path = format!(
             "/{}/{}/{}",
             registry_group,
-            url.get_service_name().join(","),
+            url.get_service_name(),
             "provider",
         );
 
         url.params.insert("anyhost".to_string(), "true".to_string());
         // define triple url path
-        let raw_url = format!("{}?{}", url.to_url(), url.encode_param(),);
+        let raw_url = url.raw_url_string();
 
         self.registries.write().unwrap().insert(dubbo_path, raw_url);
         Ok(())
     }
 
     fn unregister(&mut self, url: crate::common::url::Url) -> Result<(), 
crate::StdError> {
-        let registry_group = match 
url.get_param(REGISTRY_GROUP_KEY.to_string()) {
+        let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
             Some(key) => key,
             None => "dubbo".to_string(),
         };
@@ -78,7 +80,7 @@ impl Registry for MemoryRegistry {
         let dubbo_path = format!(
             "/{}/{}/{}",
             registry_group,
-            url.get_service_name().join(","),
+            url.get_service_name(),
             "provider",
         );
         self.registries.write().unwrap().remove(&dubbo_path);
@@ -109,6 +111,7 @@ pub struct MemoryNotifyListener {
 
 impl NotifyListener for MemoryNotifyListener {
     fn notify(&self, event: super::ServiceEvent) {
+        debug!("notify {:?}", event);
         let mut map = self.service_instances.write().expect("msg");
         match event.action.as_str() {
             "ADD" => map.insert(event.key, event.service),
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 8c56692..6352cf1 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -16,12 +16,14 @@
  */
 
 #![allow(unused_variables, dead_code, missing_docs)]
+pub mod integration;
 pub mod memory_registry;
 pub mod protocol;
+pub mod types;
 
-use std::fmt::Debug;
+use std::fmt::{Debug, Formatter};
 
-use crate::common::url::Url;
+use crate::{common::url::Url, registry::memory_registry::MemoryNotifyListener};
 
 pub trait Registry {
     type NotifyListener;
@@ -38,18 +40,24 @@ pub trait NotifyListener {
     fn notify_all(&self, event: ServiceEvent);
 }
 
+#[derive(Debug)]
 pub struct ServiceEvent {
     pub key: String,
     pub action: String,
     pub service: Vec<Url>,
 }
 
-pub type BoxRegistry =
-    Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> + 
Send + Sync>;
+pub type BoxRegistry = Box<dyn Registry<NotifyListener = MemoryNotifyListener> 
+ Send + Sync>;
+
+impl Debug for BoxRegistry {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str("BoxRegistry")
+    }
+}
 
 #[derive(Default)]
 pub struct RegistryWrapper {
-    pub registry: Option<Box<dyn Registry<NotifyListener = 
memory_registry::MemoryNotifyListener>>>,
+    pub registry: Option<Box<dyn Registry<NotifyListener = 
MemoryNotifyListener>>>,
 }
 
 impl Clone for RegistryWrapper {
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 45a24cb..04b8ac9 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -17,7 +17,8 @@
 
 use std::{
     collections::HashMap,
-    sync::{Arc, RwLock},
+    fmt::{Debug, Formatter},
+    sync::{Arc, Mutex, RwLock},
 };
 
 use super::{memory_registry::MemoryRegistry, BoxRegistry};
@@ -27,27 +28,46 @@ use crate::{
         triple::{triple_exporter::TripleExporter, 
triple_protocol::TripleProtocol},
         BoxExporter, BoxInvoker, Protocol,
     },
+    registry::types::Registries,
 };
 
 #[derive(Clone, Default)]
 pub struct RegistryProtocol {
     // registerAddr: Registry
-    registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
+    registries: Option<Registries>,
     // providerUrl: Exporter
     exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
     // serviceName: registryUrls
     services: HashMap<String, Vec<Url>>,
 }
 
+impl Debug for RegistryProtocol {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.write_str(
+            format!(
+                "RegistryProtocol services{:#?},registries,{:#?}",
+                self.services,
+                self.registries.clone()
+            )
+            .as_str(),
+        )
+    }
+}
+
 impl RegistryProtocol {
     pub fn new() -> Self {
         RegistryProtocol {
-            registries: Arc::new(RwLock::new(HashMap::new())),
+            registries: None,
             exporters: Arc::new(RwLock::new(HashMap::new())),
             services: HashMap::new(),
         }
     }
 
+    pub fn with_registries(mut self, registries: Registries) -> Self {
+        self.registries = Some(registries);
+        self
+    }
+
     pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) -> 
Self {
         self.services.extend(services);
         self
@@ -56,9 +76,11 @@ impl RegistryProtocol {
     pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
         let mem = MemoryRegistry::default();
         self.registries
-            .write()
+            .as_ref()
+            .unwrap()
+            .lock()
             .unwrap()
-            .insert(url.location, Box::new(mem.clone()));
+            .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
 
         Box::new(mem)
     }
@@ -78,23 +100,23 @@ impl Protocol for RegistryProtocol {
         // init Exporter based on provider_url
         // server registry based on register_url
         // start server health check
-        let registry_url = 
self.services.get(url.get_service_name().join(",").as_str());
+        let registry_url = self.services.get(url.get_service_name().as_str());
         if let Some(urls) = registry_url {
             for url in urls.clone().iter() {
-                if !url.protocol.is_empty() {
+                if !url.service_key.is_empty() {
                     let mut reg = self.get_registry(url.clone());
                     reg.register(url.clone()).unwrap();
                 }
             }
         }
 
-        match url.clone().protocol.as_str() {
-            "triple" => {
+        match url.clone().scheme.as_str() {
+            "tri" => {
                 let pro = Box::new(TripleProtocol::new());
                 return pro.export(url).await;
             }
             _ => {
-                tracing::error!("protocol {:?} not implemented", url.protocol);
+                tracing::error!("protocol {:?} not implemented", url.scheme);
                 Box::new(TripleExporter::new())
             }
         }
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
new file mode 100644
index 0000000..77d253a
--- /dev/null
+++ b/dubbo/src/registry/types.rs
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+    collections::HashMap,
+    sync::{Arc, Mutex},
+};
+
+use itertools::Itertools;
+use tracing::info;
+
+use crate::{
+    common::url::Url,
+    registry::{memory_registry::MemoryNotifyListener, BoxRegistry, Registry},
+    StdError,
+};
+
+pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
+pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
+
+pub const DEFAULT_REGISTRY_KEY: &str = "default";
+
+pub trait RegistriesOperation {
+    fn get(&self, registry_key: &str) -> SafeRegistry;
+    fn insert(&self, registry_key: String, registry: SafeRegistry);
+    fn default_registry(&self) -> SafeRegistry;
+}
+
+impl RegistriesOperation for Registries {
+    fn get(&self, registry_key: &str) -> SafeRegistry {
+        self.as_ref()
+            .lock()
+            .unwrap()
+            .get(registry_key)
+            .unwrap()
+            .clone()
+    }
+
+    fn insert(&self, registry_key: String, registry: SafeRegistry) {
+        self.as_ref().lock().unwrap().insert(registry_key, registry);
+    }
+
+    fn default_registry(&self) -> SafeRegistry {
+        let guard = self.as_ref().lock().unwrap();
+        let (_, result) = guard
+            .iter()
+            .find_or_first(|e| e.0 == DEFAULT_REGISTRY_KEY)
+            .unwrap()
+            .to_owned();
+        result.clone()
+    }
+}
+
+impl Registry for SafeRegistry {
+    type NotifyListener = MemoryNotifyListener;
+
+    fn register(&mut self, url: Url) -> Result<(), StdError> {
+        info!("register {}.", url);
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+
+    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+
+    fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+
+    fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
+        self.lock().unwrap().register(url).expect("registry err.");
+        Ok(())
+    }
+}
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index 4746249..cf667cc 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -16,8 +16,10 @@
  */
 
 use crate::{
-    cluster::directory::StaticDirectory, codegen::Directory,
-    triple::compression::CompressionEncoding, utils::boxed::BoxService,
+    cluster::directory::StaticDirectory,
+    codegen::{ClusterInvoker, Directory, RegistryDirectory},
+    triple::compression::CompressionEncoding,
+    utils::boxed::BoxService,
 };
 
 use aws_smithy_http::body::SdkBody;
@@ -32,6 +34,7 @@ pub struct ClientBuilder {
     pub timeout: Option<u64>,
     pub connector: &'static str,
     directory: Option<Box<dyn Directory>>,
+    cluster_invoker: Option<ClusterInvoker>,
 }
 
 impl ClientBuilder {
@@ -40,6 +43,7 @@ impl ClientBuilder {
             timeout: None,
             connector: "",
             directory: None,
+            cluster_invoker: None,
         }
     }
 
@@ -48,6 +52,7 @@ impl ClientBuilder {
             timeout: None,
             connector: "",
             directory: Some(Box::new(StaticDirectory::new(&host))),
+            cluster_invoker: None,
         }
     }
 
@@ -56,6 +61,7 @@ impl ClientBuilder {
             timeout: None,
             connector: "",
             directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
+            cluster_invoker: None,
         }
     }
 
@@ -70,6 +76,15 @@ impl ClientBuilder {
     pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
         Self {
             directory: Some(directory),
+            cluster_invoker: None,
+            ..self
+        }
+    }
+
+    pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self {
+        Self {
+            directory: None,
+            cluster_invoker: Some(ClusterInvoker::with_directory(registry)),
             ..self
         }
     }
@@ -84,6 +99,7 @@ impl ClientBuilder {
     pub fn with_connector(self, connector: &'static str) -> Self {
         Self {
             connector: connector,
+            cluster_invoker: None,
             ..self
         }
     }
@@ -92,6 +108,7 @@ impl ClientBuilder {
         TripleClient {
             send_compression_encoding: Some(CompressionEncoding::Gzip),
             directory: self.directory,
+            cluster_invoker: self.cluster_invoker,
         }
     }
 }
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 454e953..56edb96 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-use std::str::FromStr;
+use std::{str::FromStr, sync::Arc};
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
 
@@ -25,12 +25,10 @@ use rand::prelude::SliceRandom;
 use tower_service::Service;
 
 use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::{
-    cluster::{FailoverCluster, MockDirectory},
-    codegen::{Directory, RpcInvocation},
-};
+use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
 
 use crate::{
+    cluster::support::cluster_invoker::ClusterRequestBuilder,
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
     triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, 
encode::encode},
 };
@@ -39,6 +37,7 @@ use crate::{
 pub struct TripleClient {
     pub(crate) send_compression_encoding: Option<CompressionEncoding>,
     pub(crate) directory: Option<Box<dyn Directory>>,
+    pub(crate) cluster_invoker: Option<ClusterInvoker>,
 }
 
 impl TripleClient {
@@ -52,7 +51,14 @@ impl TripleClient {
         builder.build()
     }
 
-    fn map_request(
+    pub fn with_cluster(self, invoker: ClusterInvoker) -> Self {
+        TripleClient {
+            cluster_invoker: Some(invoker),
+            ..self
+        }
+    }
+
+    pub fn map_request(
         &self,
         uri: http::Uri,
         path: http::uri::PathAndQuery,
@@ -144,19 +150,28 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
-        let bytes = hyper::body::to_bytes(body).await.unwrap();
-        let sdk_body = SdkBody::from(bytes);
-
-        let url_list = self.directory.as_ref().expect("msg").list(invocation);
-        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
+        let sdk_body = SdkBody::from(body);
+        let arc_invocation = Arc::new(invocation);
+        let req;
+        let http_uri;
+        if self.cluster_invoker.is_some() {
+            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
+            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
+            http_uri = req.uri().clone();
+        } else {
+            let url_list = self
+                .directory
+                .as_ref()
+                .expect("msg")
+                .list(arc_invocation.clone());
+            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
+            http_uri =
+                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+            req = self.map_request(http_uri.clone(), path, sdk_body);
+        }
 
-        // let mut conn = Connection::new().with_host(http_uri);
-        let mut cluster = FailoverCluster::new(Box::new(MockDirectory {}));
-        let response = cluster
+        let mut conn = Connection::new().with_host(http_uri);
+        let response = conn
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
@@ -210,14 +225,24 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-
-        let url_list = self.directory.as_ref().expect("msg").list(invocation);
-        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
-
+        let arc_invocation = Arc::new(invocation);
+        let req;
+        let http_uri;
+        if self.cluster_invoker.is_some() {
+            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
+            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
+            http_uri = req.uri().clone();
+        } else {
+            let url_list = self
+                .directory
+                .as_ref()
+                .expect("msg")
+                .list(arc_invocation.clone());
+            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
+            http_uri =
+                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+            req = self.map_request(http_uri.clone(), path, sdk_body);
+        }
         let mut conn = Connection::new().with_host(http_uri);
         let response = conn
             .call(req)
@@ -257,14 +282,24 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-
-        let url_list = self.directory.as_ref().expect("msg").list(invocation);
-        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
-
+        let arc_invocation = Arc::new(invocation);
+        let req;
+        let http_uri;
+        if self.cluster_invoker.is_some() {
+            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
+            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
+            http_uri = req.uri().clone();
+        } else {
+            let url_list = self
+                .directory
+                .as_ref()
+                .expect("msg")
+                .list(arc_invocation.clone());
+            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
+            http_uri =
+                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+            req = self.map_request(http_uri.clone(), path, sdk_body);
+        }
         let mut conn = Connection::new().with_host(http_uri);
         let response = conn
             .call(req)
@@ -320,13 +355,24 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
         let sdk_body = SdkBody::from(body);
-
-        let url_list = self.directory.as_ref().expect("msg").list(invocation);
-        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
-        let http_uri =
-            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
-
-        let req = self.map_request(http_uri.clone(), path, sdk_body);
+        let arc_invocation = Arc::new(invocation);
+        let req;
+        let http_uri;
+        if self.cluster_invoker.is_some() {
+            let cluster_invoker = 
self.cluster_invoker.as_ref().unwrap().clone();
+            req = cluster_invoker.build_req(self, path, 
arc_invocation.clone(), sdk_body);
+            http_uri = req.uri().clone();
+        } else {
+            let url_list = self
+                .directory
+                .as_ref()
+                .expect("msg")
+                .list(arc_invocation.clone());
+            let real_url = url_list.choose(&mut 
rand::thread_rng()).expect("msg");
+            http_uri =
+                http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+            req = self.map_request(http_uri.clone(), path, sdk_body);
+        }
 
         let mut conn = Connection::new().with_host(http_uri);
         let response = conn
diff --git a/dubbo/src/triple/compression.rs b/dubbo/src/triple/compression.rs
index 5e3ee86..9147b84 100644
--- a/dubbo/src/triple/compression.rs
+++ b/dubbo/src/triple/compression.rs
@@ -113,12 +113,12 @@ fn test_compress() {
     let len = src.len();
     src.reserve(len);
 
-    compress(CompressionEncoding::Gzip, &mut src, &mut dst, len);
+    compress(CompressionEncoding::Gzip, &mut src, &mut dst, len).unwrap();
     println!("src: {:?}, dst: {:?}", src, dst);
 
     let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
     let de_len = dst.len();
-    decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len);
+    decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, 
de_len).unwrap();
 
     println!("src: {:?}, dst: {:?}", dst, de_dst);
 }
diff --git a/dubbo/src/triple/server/builder.rs 
b/dubbo/src/triple/server/builder.rs
index d1bcbcc..af3cc6d 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -90,14 +90,14 @@ impl ServerBuilder {
     }
 
     pub async fn serve(self) -> Result<(), crate::Error> {
-        tracing::info!("server starting. addr: {:?}", self.addr);
+        tracing::info!("server starting. addr: {:?}", self.addr.unwrap());
         self.server.serve(self.addr.unwrap()).await
     }
 }
 
 impl From<Url> for ServerBuilder {
     fn from(u: Url) -> Self {
-        let uri = match http::Uri::from_str(&u.to_url()) {
+        let uri = match http::Uri::from_str(&u.raw_url_string()) {
             Ok(v) => v,
             Err(err) => {
                 tracing::error!("http uri parse error: {}, url: {:?}", err, 
&u);
@@ -108,9 +108,9 @@ impl From<Url> for ServerBuilder {
         let authority = uri.authority().unwrap();
 
         Self {
-            listener: u.get_param("listener".to_string()).unwrap(),
+            listener: u.get_param("listener").unwrap_or("tcp".to_string()),
             addr: authority.to_string().to_socket_addrs().unwrap().next(),
-            service_names: u.service_key,
+            service_names: vec![u.service_name],
             server: DubboServer::default(),
         }
     }
diff --git a/dubbo/src/triple/transport/connection.rs 
b/dubbo/src/triple/transport/connection.rs
index fe9cc16..c99b399 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -19,6 +19,7 @@ use std::task::Poll;
 
 use hyper::client::{conn::Builder, service::Connect};
 use tower_service::Service;
+use tracing::debug;
 
 use crate::{boxed, triple::transport::connector::get_connector};
 
@@ -84,6 +85,7 @@ where
         let mut connector = Connect::new(get_connector(self.connector), 
builder);
         let uri = self.host.clone();
         let fut = async move {
+            debug!("send rpc call to {}", uri);
             let mut con = connector.call(uri).await.unwrap();
 
             con.call(req)
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index 73c6da2..4a40d66 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -103,25 +103,8 @@ impl Echo for EchoServerImpl {
         }))
     }
 
-    async fn client_streaming_echo(
-        &self,
-        req: Request<Decoding<EchoRequest>>,
-    ) -> Result<Response<EchoResponse>, dubbo::status::Status> {
-        let mut s = req.into_inner();
-        loop {
-            let result = s.next().await;
-            match result {
-                Some(Ok(val)) => println!("result: {:?}", val),
-                Some(Err(val)) => println!("err: {:?}", val),
-                None => break,
-            }
-        }
-        Ok(Response::new(EchoResponse {
-            message: "hello client streaming".to_string(),
-        }))
-    }
-
     type ServerStreamingEchoStream = ResponseStream;
+
     async fn server_streaming_echo(
         &self,
         req: Request<EchoRequest>,
@@ -143,6 +126,23 @@ impl Echo for EchoServerImpl {
 
         Ok(Response::new(Box::pin(resp)))
     }
+    async fn client_streaming_echo(
+        &self,
+        req: Request<Decoding<EchoRequest>>,
+    ) -> Result<Response<EchoResponse>, dubbo::status::Status> {
+        let mut s = req.into_inner();
+        loop {
+            let result = s.next().await;
+            match result {
+                Some(Ok(val)) => println!("result: {:?}", val),
+                Some(Err(val)) => println!("err: {:?}", val),
+                None => break,
+            }
+        }
+        Ok(Response::new(EchoResponse {
+            message: "hello client streaming".to_string(),
+        }))
+    }
 
     type BidirectionalStreamingEchoStream = ResponseStream;
 
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs 
b/examples/echo/src/generated/grpc.examples.echo.rs
index aa5d82b..c095ddf 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -31,6 +31,10 @@ pub mod echo_client {
                 inner: TripleClient::new(builder),
             }
         }
+        pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
+            self.inner = self.inner.with_cluster(invoker);
+            self
+        }
         /// UnaryEcho is unary echo.
         pub async fn unary_echo(
             &mut self,
@@ -41,7 +45,7 @@ pub mod echo_client {
                 super::EchoResponse,
             >::default();
             let invocation = RpcInvocation::default()
-                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("UnaryEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/UnaryEcho",
@@ -58,7 +62,7 @@ pub mod echo_client {
                 super::EchoResponse,
             >::default();
             let invocation = RpcInvocation::default()
-                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ServerStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
@@ -75,7 +79,7 @@ pub mod echo_client {
                 super::EchoResponse,
             >::default();
             let invocation = RpcInvocation::default()
-                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ClientStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
@@ -92,7 +96,7 @@ pub mod echo_client {
                 super::EchoResponse,
             >::default();
             let invocation = RpcInvocation::default()
-                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("BidirectionalStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index c4b20cc..c34b19f 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -24,7 +24,6 @@ async-trait = "0.1.56"
 tokio-stream = "0.1"
 tracing = "0.1"
 tracing-subscriber = "0.2.0"
-
 dubbo = {path = "../../dubbo", version = "0.2.0"}
 dubbo-config = {path = "../../config", version = "0.2.0"}
 dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = 
"0.2.0"}
diff --git a/examples/greeter/dubbo.yaml b/examples/greeter/dubbo.yaml
index b6552f8..96daf95 100644
--- a/examples/greeter/dubbo.yaml
+++ b/examples/greeter/dubbo.yaml
@@ -1,17 +1,17 @@
-name: dubbo
-provider:
-  services:
-    org.apache.dubbo.sample.tri.Greeter:
-      version: 1.0.0
-      group: test
-      protocol: triple
-      registry: 'zookeeper'
-      serializer: json
-registries:
-  zookeeper: zookeeper://localhost:2181/
-protocols:
-  triple:
-    ip: 0.0.0.0
-    port: '8888'
-    name: triple
-    listener: tcp
+dubbo:
+  protocols:
+    triple:
+      ip: 0.0.0.0
+      port: '8888'
+      name: tri
+  registries:
+    demoZK:
+      protocol: zookeeper
+      address: 10.0.6.6:2181
+  provider:
+    services:
+      GreeterProvider:
+        version: 1.0.0
+        group: test
+        protocol: triple
+        interface: org.apache.dubbo.sample.tri.Greeter
\ No newline at end of file
diff --git a/examples/greeter/src/greeter/client.rs 
b/examples/greeter/src/greeter/client.rs
index c493d60..d620843 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -21,6 +21,7 @@ pub mod protos {
 }
 
 use dubbo::codegen::*;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
 use futures_util::StreamExt;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
 use tracing::Level;
@@ -38,7 +39,7 @@ async fn main() {
 
     tracing::subscriber::set_global_default(subscriber).expect("setting 
default subscriber failed");
 
-    let mut cli = 
GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888";));
+    // let mut cli = 
GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888";));
 
     // Here is example for zk
     // let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
@@ -49,6 +50,10 @@ async fn main() {
     // let directory = RegistryDirectory::new(Box::new(zkr));
     // cli = cli.with_directory(Box::new(directory));
 
+    let zkr = ZookeeperRegistry::default();
+    let directory = RegistryDirectory::new(Box::new(zkr));
+    let mut cli = 
GreeterClient::new(ClientBuilder::new().with_registry_directory(directory));
+    // using loop for loadbalance test
     println!("# unary call");
     let resp = cli
         .greet(Request::new(GreeterRequest {
diff --git a/examples/greeter/src/greeter/server.rs 
b/examples/greeter/src/greeter/server.rs
index 37de662..271a54b 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -15,47 +15,48 @@
  * limitations under the License.
  */
 
-pub mod protos {
-    #![allow(non_camel_case_types)]
-    include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
-}
-
-use futures_util::StreamExt;
-use protos::{
-    greeter_server::{register_server, Greeter},
-    GreeterReply, GreeterRequest,
-};
-
 use std::{io::ErrorKind, pin::Pin};
 
 use async_trait::async_trait;
-use futures_util::Stream;
+use futures_util::{Stream, StreamExt};
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
+use tracing::info;
 
 use dubbo::{codegen::*, Dubbo};
 use dubbo_config::RootConfig;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+use protos::{
+    greeter_server::{register_server, Greeter},
+    GreeterReply, GreeterRequest,
+};
+
+pub mod protos {
+    #![allow(non_camel_case_types)]
+    include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
+}
 
 type ResponseStream =
     Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> + 
Send>>;
 
 #[tokio::main]
 async fn main() {
+    use tracing::{span, Level};
+    let span = span!(Level::DEBUG, "greeter.server");
+    let _enter = span.enter();
     register_server(GreeterServerImpl {
         name: "greeter".to_string(),
     });
-
-    // Dubbo::new().start().await;
-    Dubbo::new()
-        .with_config({
-            let r = RootConfig::new();
-            match r.load() {
-                Ok(config) => config,
-                Err(_err) => panic!("err: {:?}", _err), // response was droped
-            }
-        })
-        .start()
-        .await;
+    let zkr = ZookeeperRegistry::default();
+    let r = RootConfig::new();
+    let r = match r.load() {
+        Ok(config) => config,
+        Err(_err) => panic!("err: {:?}", _err), // response was droped
+    };
+    let mut f = Dubbo::new()
+        .with_config(r)
+        .add_registry("zookeeper", Box::new(zkr));
+    f.start().await;
 }
 
 #[allow(dead_code)]
@@ -71,7 +72,7 @@ impl Greeter for GreeterServerImpl {
         &self,
         request: Request<GreeterRequest>,
     ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
-        println!("GreeterServer::greet {:?}", request.metadata);
+        info!("GreeterServer::greet {:?}", request.metadata);
 
         Ok(Response::new(GreeterReply {
             message: "hello, dubbo-rust".to_string(),
diff --git a/registry-nacos/src/nacos_registry.rs 
b/registry-nacos/src/nacos_registry.rs
index a020e6b..042d6f2 100644
--- a/registry-nacos/src/nacos_registry.rs
+++ b/registry-nacos/src/nacos_registry.rs
@@ -142,9 +142,9 @@ impl Registry for NacosRegistry {
     type NotifyListener = Arc<dyn NotifyListener + Sync + Send + 'static>;
 
     fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
-        let side = url.get_param(SIDE_KEY.to_owned()).unwrap_or_default();
+        let side = url.get_param(SIDE_KEY).unwrap_or_default();
         let register_consumer = url
-            .get_param(REGISTER_CONSUMER_URL_KEY.to_owned())
+            .get_param(REGISTER_CONSUMER_URL_KEY)
             .unwrap_or_else(|| false.to_string())
             .parse::<bool>()
             .unwrap_or(false);
@@ -313,21 +313,17 @@ struct NacosServiceName {
 
 impl NacosServiceName {
     fn new(url: &Url) -> NacosServiceName {
-        let service_interface = url
-            .get_service_name()
-            .into_iter()
-            .next()
-            .unwrap_or_default();
+        let service_interface = url.get_service_name();
 
-        let category = 
url.get_param(CATEGORY_KEY.to_owned()).unwrap_or_default();
+        let category = url.get_param(CATEGORY_KEY).unwrap_or_default();
 
-        let version = 
url.get_param(VERSION_KEY.to_owned()).unwrap_or_default();
+        let version = url.get_param(VERSION_KEY).unwrap_or_default();
 
-        let group = url.get_param(GROUP_KEY.to_owned()).unwrap_or_default();
+        let group = url.get_param(GROUP_KEY).unwrap_or_default();
 
         Self {
             category,
-            service_interface,
+            service_interface: service_interface.clone(),
             version,
             group,
         }
diff --git a/registry-nacos/src/utils/mod.rs b/registry-nacos/src/utils/mod.rs
index 94f905a..2cca6a2 100644
--- a/registry-nacos/src/utils/mod.rs
+++ b/registry-nacos/src/utils/mod.rs
@@ -40,7 +40,7 @@ pub(crate) fn build_nacos_client_props(url: &Url) -> 
(nacos_sdk::api::props::Cli
     let host = &url.ip;
     let port = &url.port;
     let backup = url
-        .get_param(BACKUP_KEY.to_owned())
+        .get_param(BACKUP_KEY)
         .map(|mut data| {
             data.insert(0, ',');
             data
@@ -49,13 +49,13 @@ pub(crate) fn build_nacos_client_props(url: &Url) -> 
(nacos_sdk::api::props::Cli
     let server_addr = format!("{}:{}{}", host, port, backup);
 
     let namespace = url
-        .get_param(NAMESPACE_KEY.to_owned())
-        .unwrap_or_else(|| DEFAULT_NAMESPACE.to_owned());
+        .get_param(NAMESPACE_KEY)
+        .unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
     let app_name = url
-        .get_param(APP_NAME_KEY.to_owned())
-        .unwrap_or_else(|| UNKNOWN_APP.to_owned());
-    let username = url.get_param(USERNAME_KEY.to_owned()).unwrap_or_default();
-    let password = url.get_param(PASSWORD_KEY.to_owned()).unwrap_or_default();
+        .get_param(APP_NAME_KEY)
+        .unwrap_or_else(|| UNKNOWN_APP.to_string());
+    let username = url.get_param(USERNAME_KEY).unwrap_or_default();
+    let password = url.get_param(PASSWORD_KEY).unwrap_or_default();
 
     let enable_auth = !password.is_empty() && !username.is_empty();
 
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
index cf040fa..722462a 100644
--- a/registry-zookeeper/Cargo.toml
+++ b/registry-zookeeper/Cargo.toml
@@ -7,8 +7,9 @@ license = "Apache-2.0"
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-zookeeper = "0.6.1"
+zookeeper = "0.7.0"
 dubbo = {path = "../dubbo/", version = "0.2.0"}
 serde_json = "1.0"
 serde = {version = "1.0.145",features = ["derive"]}
 tracing = "0.1"
+urlencoding = "2.1.2"
\ No newline at end of file
diff --git a/registry-zookeeper/src/zookeeper_registry.rs 
b/registry-zookeeper/src/zookeeper_registry.rs
index be4e24b..a9fc9ec 100644
--- a/registry-zookeeper/src/zookeeper_registry.rs
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -17,24 +17,36 @@
 
 #![allow(unused_variables, dead_code, missing_docs)]
 
-use dubbo::{
-    common::url::Url,
-    registry::{memory_registry::MemoryNotifyListener, NotifyListener, 
Registry, ServiceEvent},
-    StdError,
-};
-use serde::{Deserialize, Serialize};
 use std::{
     collections::{HashMap, HashSet},
-    sync::{Arc, RwLock},
+    env,
+    sync::{Arc, Mutex, RwLock},
     time::Duration,
 };
-use tracing::info;
 
-use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+use serde::{Deserialize, Serialize};
+use tracing::{error, info};
+#[allow(unused_imports)]
+use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, 
ZooKeeper};
+
+use dubbo::{
+    cluster::support::cluster_invoker::ClusterInvoker,
+    codegen::BoxRegistry,
+    common::{
+        consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
+        url::Url,
+    },
+    registry::{
+        integration::ClusterRegistryIntegration,
+        memory_registry::{MemoryNotifyListener, MemoryRegistry},
+        NotifyListener, Registry, ServiceEvent,
+    },
+    StdError,
+};
 
-// extract service registry metadata from url
-/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
-/// dubboPath = fmt.Sprintf("/%s/%s/%s", 
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), 
common.DubboNodes[common.PROVIDER])
+// 从url中获取服务注册的元数据
+// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+// dubboPath = fmt.Sprintf("/%s/%s/%s", 
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), 
common.DubboNodes[common.PROVIDER])
 
 pub const REGISTRY_GROUP_KEY: &str = "registry.group";
 
@@ -49,20 +61,8 @@ impl Watcher for LoggingWatcher {
 pub struct ZookeeperRegistry {
     root_path: String,
     zk_client: Arc<ZooKeeper>,
-
     listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as 
Registry>::NotifyListener>>>,
-}
-
-pub struct MyNotifyListener {}
-
-impl NotifyListener for MyNotifyListener {
-    fn notify(&self, event: dubbo::registry::ServiceEvent) {
-        todo!()
-    }
-
-    fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
-        todo!()
-    }
+    memory_registry: Arc<Mutex<MemoryRegistry>>,
 }
 
 #[derive(Serialize, Deserialize, Debug)]
@@ -90,11 +90,12 @@ impl ZookeeperRegistry {
     pub fn new(connect_string: &str) -> ZookeeperRegistry {
         let zk_client =
             ZooKeeper::connect(connect_string, Duration::from_secs(15), 
LoggingWatcher).unwrap();
+        info!("zk server connect string: {}", connect_string);
         ZookeeperRegistry {
             root_path: "/services".to_string(),
             zk_client: Arc::new(zk_client),
-
             listeners: RwLock::new(HashMap::new()),
+            memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
         }
     }
 
@@ -106,15 +107,16 @@ impl ZookeeperRegistry {
     ) -> ServiceInstancesChangedListener {
         let mut service_names = HashSet::new();
         service_names.insert(service_name.clone());
-        return ServiceInstancesChangedListener {
+        ServiceInstancesChangedListener {
             zk_client: Arc::clone(&self.zk_client),
-            path: path,
-
+            path,
             service_name: service_name.clone(),
-            listener: listener,
-        };
+            listener,
+        }
     }
 
+    // metadata /dubbo/mapping designed for application discovery; deprecated 
for currently using interface discovery
+    // #[deprecated]
     fn get_app_name(&self, service_name: String) -> String {
         let res = self
             .zk_client
@@ -127,25 +129,155 @@ impl ZookeeperRegistry {
         };
         s.to_string()
     }
+
+    pub fn get_client(&self) -> Arc<ZooKeeper> {
+        self.zk_client.clone()
+    }
+
+    // If the parent node does not exist in the ZooKeeper, 
Err(ZkError::NoNode) will be returned.
+    pub fn create_path(
+        &self,
+        path: &str,
+        data: &str,
+        create_mode: CreateMode,
+    ) -> Result<(), StdError> {
+        if self.exists_path(path) {
+            self.zk_client
+                .set_data(path, data.as_bytes().to_vec(), None)
+                .unwrap_or_else(|_| panic!("set data to {} failed.", path));
+            return Ok(());
+        }
+        let zk_result = self.zk_client.create(
+            path,
+            data.as_bytes().to_vec(),
+            Acl::open_unsafe().clone(),
+            create_mode,
+        );
+        match zk_result {
+            Ok(_) => Ok(()),
+            Err(err) => {
+                error!("zk path {} parent not exists.", path);
+                Err(Box::try_from(err).unwrap())
+            }
+        }
+    }
+
+    // For avoiding Err(ZkError::NoNode) when parent node is't exists
+    pub fn create_path_with_parent_check(
+        &self,
+        path: &str,
+        data: &str,
+        create_mode: CreateMode,
+    ) -> Result<(), StdError> {
+        let nodes: Vec<&str> = path.split('/').collect();
+        let mut current: String = String::new();
+        let children = *nodes.last().unwrap();
+        for node_key in nodes {
+            if node_key.is_empty() {
+                continue;
+            };
+            current.push('/');
+            current.push_str(node_key);
+            if !self.exists_path(current.as_str()) {
+                let new_create_mode = match children == node_key {
+                    true => create_mode,
+                    false => CreateMode::Persistent,
+                };
+                let new_data = match children == node_key {
+                    true => data,
+                    false => "",
+                };
+                self.create_path(current.as_str(), new_data, new_create_mode)
+                    .unwrap();
+            }
+        }
+        Ok(())
+    }
+
+    pub fn delete_path(&self, path: &str) {
+        if self.exists_path(path) {
+            self.get_client().delete(path, None).unwrap()
+        }
+    }
+
+    pub fn exists_path(&self, path: &str) -> bool {
+        self.zk_client.exists(path, false).unwrap().is_some()
+    }
+
+    pub fn get_data(&self, path: &str, watch: bool) -> Option<String> {
+        if self.exists_path(path) {
+            let zk_result = self.zk_client.get_data(path, watch);
+            if let Ok(..) = zk_result {
+                Some(String::from_utf8(zk_result.unwrap().0).unwrap())
+            } else {
+                None
+            }
+        } else {
+            None
+        }
+    }
+}
+
+impl Default for ZookeeperRegistry {
+    fn default() -> ZookeeperRegistry {
+        let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+            Ok(val) => val,
+            Err(_) => {
+                let default_connect_string = "localhost:2181";
+                info!(
+                    "No ZOOKEEPER_SERVERS env value, using {} as default.",
+                    default_connect_string
+                );
+                default_connect_string.to_string()
+            }
+        };
+        println!(
+            "using external registry with it's connect string {}",
+            zk_connect_string.as_str()
+        );
+        ZookeeperRegistry::new(zk_connect_string.as_str())
+    }
 }
 
 impl Registry for ZookeeperRegistry {
     type NotifyListener = MemoryNotifyListener;
 
     fn register(&mut self, url: Url) -> Result<(), StdError> {
-        todo!();
+        println!("register url: {}", url);
+        let zk_path = format!(
+            "/{}/{}/{}/{}",
+            DUBBO_KEY,
+            url.service_name,
+            PROVIDERS_KEY,
+            url.encoded_raw_url_string()
+        );
+        self.create_path_with_parent_check(zk_path.as_str(), LOCALHOST_IP, 
CreateMode::Ephemeral)?;
+        Ok(())
     }
 
     fn unregister(&mut self, url: Url) -> Result<(), StdError> {
-        todo!();
+        let zk_path = format!(
+            "/{}/{}/{}/{}",
+            DUBBO_KEY,
+            url.service_name,
+            PROVIDERS_KEY,
+            url.encoded_raw_url_string()
+        );
+        self.delete_path(zk_path.as_str());
+        Ok(())
     }
 
+    // for consumer to find the changes of providers
     fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
-        let binding = url.get_service_name();
-        let service_name = binding.get(0).unwrap();
-        let app_name = self.get_app_name(service_name.clone());
-        let path = self.root_path.clone() + "/" + &app_name;
-        if self.listeners.read().unwrap().get(service_name).is_some() {
+        let service_name = url.get_service_name();
+        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, 
PROVIDERS_KEY);
+        if self
+            .listeners
+            .read()
+            .unwrap()
+            .get(service_name.as_str())
+            .is_some()
+        {
             return Ok(());
         }
 
@@ -156,37 +288,32 @@ impl Registry for ZookeeperRegistry {
             .insert(service_name.to_string(), Arc::clone(&arc_listener));
 
         let zk_listener = self.create_listener(
-            path.clone(),
+            zk_path.clone(),
             service_name.to_string(),
             Arc::clone(&arc_listener),
         );
 
-        let res = self.zk_client.get_children_w(&path, zk_listener);
-        let result: Vec<Url> = res
-            .unwrap()
-            .iter()
-            .map(|node_key| {
-                let zk_res = self.zk_client.get_data(
-                    &(self.root_path.clone() + "/" + &app_name + "/" + 
&node_key),
-                    false,
-                );
-                let vec_u8 = zk_res.unwrap().0;
-                let sstr = std::str::from_utf8(&vec_u8).unwrap();
-                let instance: ZkServiceInstance = 
serde_json::from_str(sstr).unwrap();
-                let url = Url::from_url(&format!(
-                    "triple://{}:{}/{}",
-                    instance.get_host(),
-                    instance.get_port(),
-                    service_name
-                ))
-                .unwrap();
-                url
-            })
-            .collect();
-
-        info!("notifing {}->{:?}", service_name, result);
+        let zk_changed_paths = self.zk_client.get_children_w(&zk_path, 
zk_listener);
+        let result = match zk_changed_paths {
+            Err(err) => {
+                error!("zk subscribe error: {}", err);
+                Vec::new()
+            }
+            Ok(urls) => urls
+                .iter()
+                .map(|node_key| {
+                    let provider_url: Url = urlencoding::decode(node_key)
+                        .unwrap()
+                        .to_string()
+                        .as_str()
+                        .into();
+                    provider_url
+                })
+                .collect(),
+        };
+        info!("notifying {}->{:?}", service_name, result);
         arc_listener.notify(ServiceEvent {
-            key: service_name.to_string(),
+            key: service_name,
             action: String::from("ADD"),
             service: result,
         });
@@ -201,7 +328,6 @@ impl Registry for ZookeeperRegistry {
 pub struct ServiceInstancesChangedListener {
     zk_client: Arc<ZooKeeper>,
     path: String,
-
     service_name: String,
     listener: Arc<MemoryNotifyListener>,
 }
@@ -213,40 +339,26 @@ impl Watcher for ServiceInstancesChangedListener {
             let event_path = path.clone();
             let dirs = self
                 .zk_client
-                .get_children(&event_path.clone(), false)
+                .get_children(&event_path, false)
                 .expect("msg");
             let result: Vec<Url> = dirs
                 .iter()
                 .map(|node_key| {
-                    let zk_res = self
-                        .zk_client
-                        .get_data(&(event_path.clone() + "/" + node_key), 
false);
-                    let vec_u8 = zk_res.unwrap().0;
-                    let sstr = std::str::from_utf8(&vec_u8).unwrap();
-                    let instance: ZkServiceInstance = 
serde_json::from_str(sstr).unwrap();
-                    let url = Url::from_url(&format!(
-                        "triple://{}:{}/{}",
-                        instance.get_host(),
-                        instance.get_port(),
-                        self.service_name
-                    ))
-                    .unwrap();
-                    url
+                    let provider_url: Url = node_key.as_str().into();
+                    provider_url
                 })
                 .collect();
-
             let res = self.zk_client.get_children_w(
                 &path,
                 ServiceInstancesChangedListener {
                     zk_client: Arc::clone(&self.zk_client),
                     path: path.clone(),
-
                     service_name: self.service_name.clone(),
                     listener: Arc::clone(&self.listener),
                 },
             );
 
-            info!("notifing {}->{:?}", self.service_name, result);
+            info!("notify {}->{:?}", self.service_name, result);
             self.listener.notify(ServiceEvent {
                 key: self.service_name.clone(),
                 action: String::from("ADD"),
@@ -258,48 +370,102 @@ impl Watcher for ServiceInstancesChangedListener {
 
 impl NotifyListener for ServiceInstancesChangedListener {
     fn notify(&self, event: ServiceEvent) {
-        todo!()
+        self.listener.notify(event);
     }
 
     fn notify_all(&self, event: ServiceEvent) {
-        todo!()
+        self.listener.notify(event);
     }
 }
 
-#[test]
-fn it_works() {
-    let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
-    let zk_client =
-        ZooKeeper::connect(connect_string, Duration::from_secs(15), 
LoggingWatcher).unwrap();
-    let watcher = Arc::new(Some(TestZkWatcher {
-        watcher: Arc::new(None),
-    }));
-    watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
-    let x = watcher.as_ref().expect("");
-    zk_client.get_children_w("/test", x);
-    zk_client.delete("/test/a", None);
-    zk_client.delete("/test/b", None);
-    let zk_res = zk_client.create(
-        "/test/a",
-        vec![1, 3],
-        Acl::open_unsafe().clone(),
-        CreateMode::Ephemeral,
-    );
-    let zk_res = zk_client.create(
-        "/test/b",
-        vec![1, 3],
-        Acl::open_unsafe().clone(),
-        CreateMode::Ephemeral,
-    );
-    zk_client.close();
+impl ClusterRegistryIntegration for ZookeeperRegistry {
+    fn get_invoker(registry: BoxRegistry) -> Option<Arc<ClusterInvoker>> {
+        todo!()
+    }
 }
 
-struct TestZkWatcher {
-    pub watcher: Arc<Option<TestZkWatcher>>,
-}
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
 
-impl Watcher for TestZkWatcher {
-    fn handle(&self, event: WatchedEvent) {
-        println!("event: {:?}", event);
+    use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
+
+    use crate::zookeeper_registry::ZookeeperRegistry;
+
+    struct TestZkWatcher {
+        pub watcher: Arc<Option<TestZkWatcher>>,
+    }
+
+    impl Watcher for TestZkWatcher {
+        fn handle(&self, event: WatchedEvent) {
+            println!("event: {:?}", event);
+        }
+    }
+
+    #[test]
+    fn zk_read_write_watcher() {
+        // 
https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
+        // using ENV to set zookeeper server urls
+        let zkr = ZookeeperRegistry::default();
+        let zk_client = zkr.get_client();
+        let watcher = TestZkWatcher {
+            watcher: Arc::new(None),
+        };
+        if zk_client.exists("/test", true).is_err() {
+            zk_client
+                .create(
+                    "/test",
+                    vec![1, 3],
+                    Acl::open_unsafe().clone(),
+                    CreateMode::Ephemeral,
+                )
+                .unwrap();
+        }
+        let zk_res = zk_client.create(
+            "/test",
+            "hello".into(),
+            Acl::open_unsafe().clone(),
+            CreateMode::Ephemeral,
+        );
+        let result = zk_client.get_children_w("/test", watcher);
+        assert!(result.is_ok());
+        if zk_client.exists("/test/a", true).is_err() {
+            zk_client.delete("/test/a", None).unwrap();
+        }
+        if zk_client.exists("/test/a", true).is_err() {
+            zk_client.delete("/test/b", None).unwrap();
+        }
+        let zk_res = zk_client.create(
+            "/test/a",
+            "hello".into(),
+            Acl::open_unsafe().clone(),
+            CreateMode::Ephemeral,
+        );
+        let zk_res = zk_client.create(
+            "/test/b",
+            "world".into(),
+            Acl::open_unsafe().clone(),
+            CreateMode::Ephemeral,
+        );
+        let test_a_result = zk_client.get_data("/test", true);
+        assert!(test_a_result.is_ok());
+        let vec1 = test_a_result.unwrap().0;
+        // data in /test should equals to "hello"
+        assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
+        zk_client.close().unwrap()
+    }
+
+    #[test]
+    fn create_path_with_parent_check() {
+        let zkr = ZookeeperRegistry::default();
+        let path = "/du1bbo/test11111";
+        let data = "hello";
+        // creating a child on a not exists parent, throw a NoNode error.
+        // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
+        // assert!(result.is_err());
+        let create_with_parent_check_result =
+            zkr.create_path_with_parent_check(path, data, 
CreateMode::Ephemeral);
+        assert!(create_with_parent_check_result.is_ok());
+        assert_eq!(data, zkr.get_data(path, false).unwrap());
     }
 }
diff --git a/scripts/ci-check.sh b/scripts/ci-check.sh
new file mode 100755
index 0000000..5ffd46d
--- /dev/null
+++ b/scripts/ci-check.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#/*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements.  See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+cargo fmt --all -- --check
+# use stable channel
+cargo check
+target/debug/greeter-server && target/debug/greeter-client && sleep 3s ;

Reply via email to