This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new aeeb354  Implement service discovery (#103)
aeeb354 is described below

commit aeeb3546be6a4ba01098fb77de35f29ff3b167e0
Author: Robert LU <[email protected]>
AuthorDate: Thu Feb 9 20:10:49 2023 +0800

    Implement service discovery (#103)
    
    * implement service discovery
    
    * update github-actions.yml
    
    * use new pattern
    
    * Update zookeeper_registry.rs
---
 .github/workflows/github-actions.yml               |  42 +--
 Cargo.toml                                         |   1 +
 config/src/config.rs                               |   2 +-
 dubbo-build/src/client.rs                          | 134 ++++++---
 dubbo/Cargo.toml                                   |   1 +
 dubbo/src/cluster/directory.rs                     | 150 ++++++++++
 dubbo/src/cluster/mod.rs                           |  18 ++
 dubbo/src/codegen.rs                               |   6 +
 dubbo/src/invocation.rs                            |  32 +++
 dubbo/src/lib.rs                                   |   1 +
 dubbo/src/protocol/triple/triple_invoker.rs        |  18 +-
 dubbo/src/protocol/triple/triple_protocol.rs       |   5 +-
 dubbo/src/registry/memory_registry.rs              |  12 +-
 dubbo/src/registry/mod.rs                          |  25 +-
 dubbo/src/registry/protocol.rs                     |   4 +-
 dubbo/src/triple/client/builder.rs                 |  67 ++---
 dubbo/src/triple/client/triple.rs                  | 127 ++++-----
 examples/echo/Cargo.toml                           |   3 +
 examples/echo/src/echo/client.rs                   |   9 +-
 .../hello_echo.rs => echo/grpc.examples.echo.rs}   |  54 ++--
 examples/echo/src/protos/hello_echo.rs             |  54 ++--
 examples/greeter/Cargo.toml                        |   3 +
 examples/greeter/proto/greeter.proto               |   2 +-
 examples/greeter/src/greeter/client.rs             |  23 +-
 registry-zookeeper/Cargo.toml                      |  14 +
 registry-zookeeper/LICENSE                         | 202 ++++++++++++++
 registry-zookeeper/src/lib.rs                      |  27 ++
 registry-zookeeper/src/zookeeper_registry.rs       | 306 +++++++++++++++++++++
 28 files changed, 1095 insertions(+), 247 deletions(-)

diff --git a/.github/workflows/github-actions.yml 
b/.github/workflows/github-actions.yml
index e69ee90..d6c85bb 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -23,6 +23,18 @@ jobs:
       - uses: actions-rs/toolchain@v1
         with:
           toolchain: stable
+      - name: Set up cargo cache
+        uses: actions/cache@v3
+        continue-on-error: false
+        with:
+          path: |
+            ~/.cargo/bin/
+            ~/.cargo/registry/index/
+            ~/.cargo/registry/cache/
+            ~/.cargo/git/db/
+            target/
+          key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+          restore-keys: ${{ runner.os }}-cargo-
       - name: setup protoc
         run: |
           mkdir $HOME/protoc/ -p &&
@@ -32,19 +44,9 @@ jobs:
           unzip /tmp/protoc-21.9-linux-x86_64.zip &&
           echo "$HOME/protoc/bin" >> $GITHUB_PATH
         shell: bash
-      - run: cargo check
-
-  fmt:
-    name: Rustfmt
-    runs-on: ubuntu-latest
-
-    steps:
-      - uses: actions/checkout@main
-      - uses: actions-rs/toolchain@v1
-        with:
-          toolchain: stable
       - run: rustup component add rustfmt
       - run: cargo fmt --all -- --check
+      - run: cargo check
 
   example-greeter:
     name: example/greeter
@@ -54,15 +56,6 @@ jobs:
       - uses: actions-rs/toolchain@v1
         with:
           toolchain: stable
-      - name: setup protoc
-        run: |
-          mkdir $HOME/protoc/ -p &&
-          cd $HOME/protoc/ &&
-          curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \
-          
https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip
 &&
-          unzip /tmp/protoc-21.9-linux-x86_64.zip &&
-          echo "$HOME/protoc/bin" >> $GITHUB_PATH
-        shell: bash
       - name: Set up cargo cache
         uses: actions/cache@v3
         continue-on-error: false
@@ -75,6 +68,15 @@ jobs:
             target/
           key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
           restore-keys: ${{ runner.os }}-cargo-
+      - name: setup protoc
+        run: |
+          mkdir $HOME/protoc/ -p &&
+          cd $HOME/protoc/ &&
+          curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \
+          
https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip
 &&
+          unzip /tmp/protoc-21.9-linux-x86_64.zip &&
+          echo "$HOME/protoc/bin" >> $GITHUB_PATH
+        shell: bash
       - run: cargo build
         working-directory: examples/greeter
       - name: example greeter
diff --git a/Cargo.toml b/Cargo.toml
index 140c079..d3942a6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,6 +2,7 @@
 members = [
   "xds",
   "registry",
+  "registry-zookeeper",
   "metadata",
   "common",
   "config",
diff --git a/config/src/config.rs b/config/src/config.rs
index 672319a..500fd33 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -77,7 +77,7 @@ impl RootConfig {
                 v
             }
             Err(err) => {
-                tracing::error!(
+                tracing::warn!(
                     "error loading config_path: {:?}, use default path: {:?}",
                     err,
                     DUBBO_CONFIG_PATH
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 8d58ab1..05e8e89 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -66,11 +66,11 @@ pub fn generate<T: Service>(
             #service_doc
             #(#struct_attributes)*
             #[derive(Debug, Clone, Default)]
-            pub struct #service_ident<T> {
-                inner: TripleClient<T>,
+            pub struct #service_ident {
+                inner: TripleClient,
             }
 
-            impl #service_ident<ClientBoxService> {
+            impl #service_ident {
                 pub fn connect(host: String) -> Self {
                     let cli = TripleClient::connect(host);
                     #service_ident {
@@ -78,33 +78,27 @@ pub fn generate<T: Service>(
                     }
                 }
 
-                pub fn build(builder: ClientBuilder) -> Self {
-                    Self {
-                        inner: TripleClient::with_builder(builder),
-                    }
-                }
-            }
+                // pub fn build(builder: ClientBuilder) -> Self {
+                //     Self {
+                //         inner: TripleClient::new(builder),
+                //     }
+                // }
 
-            impl<T> #service_ident<T>
-            where
-                T: Service<http::Request<hyperBody>, Response = 
http::Response<BoxBody>>,
-                T::Error: Into<StdError>,
-            {
-                pub fn new(inner: T, builder: ClientBuilder) -> Self {
+                pub fn new(builder: ClientBuilder) -> Self {
                     Self {
-                        inner: TripleClient::new(inner, builder),
+                        inner: TripleClient::new(builder),
                     }
                 }
 
-                pub fn with_filter<F>(self, filter: F) -> 
#service_ident<FilterService<T, F>>
-                where
-                    F: Filter,
-                {
-                    let inner = self.inner.with_filter(filter);
-                    #service_ident {
-                        inner,
-                    }
-                }
+                // pub fn with_filter<F>(self, filter: F) -> 
#service_ident<FilterService<T, F>>
+                // where
+                //     F: Filter,
+                // {
+                //     let inner = self.inner.with_filter(filter);
+                //     #service_ident {
+                //         inner,
+                //     }
+                // }
 
                 #methods
 
@@ -123,6 +117,12 @@ fn generate_methods<T: Service>(
     let package = if emit_package { service.package() } else { "" };
 
     for method in service.methods() {
+        let service_unique_name = format!(
+            "{}{}{}",
+            package,
+            if package.is_empty() { "" } else { "." },
+            service.identifier()
+        );
         let path = format!(
             "/{}{}{}/{}",
             package,
@@ -134,14 +134,34 @@ fn generate_methods<T: Service>(
         stream.extend(generate_doc_comments(method.comment()));
 
         let method = match (method.client_streaming(), 
method.server_streaming()) {
-            (false, false) => generate_unary(&method, proto_path, 
compile_well_known_types, path),
-            (false, true) => {
-                generate_server_streaming(&method, proto_path, 
compile_well_known_types, path)
-            }
-            (true, false) => {
-                generate_client_streaming(&method, proto_path, 
compile_well_known_types, path)
-            }
-            (true, true) => generate_streaming(&method, proto_path, 
compile_well_known_types, path),
+            (false, false) => generate_unary(
+                service_unique_name,
+                &method,
+                proto_path,
+                compile_well_known_types,
+                path,
+            ),
+            (false, true) => generate_server_streaming(
+                service_unique_name,
+                &method,
+                proto_path,
+                compile_well_known_types,
+                path,
+            ),
+            (true, false) => generate_client_streaming(
+                service_unique_name,
+                &method,
+                proto_path,
+                compile_well_known_types,
+                path,
+            ),
+            (true, true) => generate_streaming(
+                service_unique_name,
+                &method,
+                proto_path,
+                compile_well_known_types,
+                path,
+            ),
         };
 
         stream.extend(method);
@@ -151,6 +171,7 @@ fn generate_methods<T: Service>(
 }
 
 fn generate_unary<T: Method>(
+    service_unique_name: String,
     method: &T,
     proto_path: &str,
     compile_well_known_types: bool,
@@ -159,6 +180,7 @@ fn generate_unary<T: Method>(
     let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
     let ident = format_ident!("{}", method.name());
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
+    let method_name = method.identifier();
 
     quote! {
         pub async fn #ident(
@@ -166,19 +188,22 @@ fn generate_unary<T: Method>(
             request: Request<#request>,
         ) -> Result<Response<#response>, dubbo::status::Status> {
            let codec = #codec_name::<#request, #response>::default();
+           let invocation = RpcInvocation::default()
+            .with_servie_unique_name(String::from(#service_unique_name))
+            .with_method_name(String::from(#method_name));
            let path = http::uri::PathAndQuery::from_static(#path);
-           self.inner
-            .unary(
+           self.inner.unary(
                 request,
                 codec,
                 path,
-            )
-            .await
+                invocation,
+            ).await
         }
     }
 }
 
 fn generate_server_streaming<T: Method>(
+    service_unique_name: String,
     method: &T,
     proto_path: &str,
     compile_well_known_types: bool,
@@ -188,6 +213,7 @@ fn generate_server_streaming<T: Method>(
     let ident = format_ident!("{}", method.name());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
+    let method_name = method.identifier();
 
     quote! {
         pub async fn #ident(
@@ -196,13 +222,22 @@ fn generate_server_streaming<T: Method>(
         ) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
 
             let codec = #codec_name::<#request, #response>::default();
+            let invocation = RpcInvocation::default()
+             .with_servie_unique_name(String::from(#service_unique_name))
+             .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
-            self.inner.server_streaming(request, codec, path).await
+            self.inner.server_streaming(
+                request,
+                codec,
+                path,
+                invocation,
+            ).await
         }
     }
 }
 
 fn generate_client_streaming<T: Method>(
+    service_unique_name: String,
     method: &T,
     proto_path: &str,
     compile_well_known_types: bool,
@@ -212,6 +247,7 @@ fn generate_client_streaming<T: Method>(
     let ident = format_ident!("{}", method.name());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
+    let method_name = method.identifier();
 
     quote! {
         pub async fn #ident(
@@ -219,13 +255,22 @@ fn generate_client_streaming<T: Method>(
             request: impl IntoStreamingRequest<Message = #request>
         ) -> Result<Response<#response>, dubbo::status::Status> {
             let codec = #codec_name::<#request, #response>::default();
+            let invocation = RpcInvocation::default()
+             .with_servie_unique_name(String::from(#service_unique_name))
+             .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
-            self.inner.client_streaming(request, codec, path).await
+            self.inner.client_streaming(
+                request,
+                codec,
+                path,
+                invocation,
+            ).await
         }
     }
 }
 
 fn generate_streaming<T: Method>(
+    service_unique_name: String,
     method: &T,
     proto_path: &str,
     compile_well_known_types: bool,
@@ -235,6 +280,7 @@ fn generate_streaming<T: Method>(
     let ident = format_ident!("{}", method.name());
 
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
+    let method_name = method.identifier();
 
     quote! {
         pub async fn #ident(
@@ -242,8 +288,16 @@ fn generate_streaming<T: Method>(
             request: impl IntoStreamingRequest<Message = #request>
         ) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
             let codec = #codec_name::<#request, #response>::default();
+            let invocation = RpcInvocation::default()
+             .with_servie_unique_name(String::from(#service_unique_name))
+             .with_method_name(String::from(#method_name));
             let path = http::uri::PathAndQuery::from_static(#path);
-            self.inner.bidi_streaming(request, codec, path).await
+            self.inner.bidi_streaming(
+                request,
+                codec,
+                path,
+                invocation,
+            ).await
         }
     }
 }
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 145b4a0..6a51730 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -24,6 +24,7 @@ async-trait = "0.1.56"
 tower-layer = "0.3"
 bytes = "1.0"
 pin-project = "1.0"
+rand = "0.8.5"
 serde_json = "1.0.82"
 serde = {version="1.0.138", features = ["derive"]}
 futures = "0.3"
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
new file mode 100644
index 0000000..4952de7
--- /dev/null
+++ b/dubbo/src/cluster/directory.rs
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::str::FromStr;
+use std::sync::{Arc, RwLock};
+
+use crate::common::url::Url;
+use crate::invocation::{Invocation, RpcInvocation};
+use crate::registry::memory_registry::MemoryNotifyListener;
+use crate::registry::{BoxRegistry, RegistryWrapper};
+
+/// Directory.
+///
+/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
+pub trait Directory: Debug + DirectoryClone {
+    fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+}
+
+pub trait DirectoryClone {
+    fn clone_box(&self) -> Box<dyn Directory>;
+}
+
+impl<T> DirectoryClone for T
+where
+    T: 'static + Directory + Clone,
+{
+    fn clone_box(&self) -> Box<dyn Directory> {
+        Box::new(self.clone())
+    }
+}
+
+impl Clone for Box<dyn Directory> {
+    fn clone(&self) -> Box<dyn Directory> {
+        self.clone_box()
+    }
+}
+
+#[derive(Debug)]
+pub struct StaticDirectory {
+    uri: http::Uri,
+}
+
+impl StaticDirectory {
+    pub fn new(host: &str) -> StaticDirectory {
+        let uri = match http::Uri::from_str(host) {
+            Ok(v) => v,
+            Err(err) => {
+                tracing::error!("http uri parse error: {}, host: {}", err, 
host);
+                panic!("http uri parse error: {}, host: {}", err, host)
+            }
+        };
+        StaticDirectory { uri: uri }
+    }
+
+    pub fn from_uri(uri: &http::Uri) -> StaticDirectory {
+        StaticDirectory { uri: uri.clone() }
+    }
+}
+
+impl Directory for StaticDirectory {
+    fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+        let url = Url::from_url(&format!(
+            "triple://{}:{}/{}",
+            self.uri.host().unwrap(),
+            self.uri.port().unwrap(),
+            invocation.get_target_service_unique_name(),
+        ))
+        .unwrap();
+        vec![url]
+    }
+}
+
+impl DirectoryClone for StaticDirectory {
+    fn clone_box(&self) -> Box<dyn Directory> {
+        Box::new(StaticDirectory {
+            uri: self.uri.clone(),
+        })
+    }
+}
+
+#[derive(Debug)]
+pub struct RegistryDirectory {
+    registry: RegistryWrapper,
+    service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
+
+impl RegistryDirectory {
+    pub fn new(registry: BoxRegistry) -> RegistryDirectory {
+        RegistryDirectory {
+            registry: RegistryWrapper {
+                registry: Some(registry),
+            },
+            service_instances: Arc::new(RwLock::new(HashMap::new())),
+        }
+    }
+}
+
+impl DirectoryClone for RegistryDirectory {
+    fn clone_box(&self) -> Box<dyn Directory> {
+        todo!()
+    }
+}
+
+impl Directory for RegistryDirectory {
+    fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+        let service_name = invocation.get_target_service_unique_name();
+
+        let url = Url::from_url(&format!(
+            "triple://{}:{}/{}",
+            "127.0.0.1", "8888", service_name
+        ))
+        .unwrap();
+
+        self.registry
+            .registry
+            .as_ref()
+            .expect("msg")
+            .subscribe(
+                url,
+                MemoryNotifyListener {
+                    service_instances: Arc::clone(&self.service_instances),
+                },
+            )
+            .expect("subscribe");
+
+        let map = self
+            .service_instances
+            .read()
+            .expect("service_instances.read");
+        let binding = Vec::new();
+        let url_vec = map.get(&service_name).unwrap_or(&binding);
+        url_vec.to_vec()
+    }
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
new file mode 100644
index 0000000..84980ea
--- /dev/null
+++ b/dubbo/src/cluster/mod.rs
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod directory;
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index ef65646..3535fbf 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -24,9 +24,15 @@ pub use http_body::Body;
 pub use hyper::Body as hyperBody;
 pub use tower_service::Service;
 
+pub use super::cluster::directory::Directory;
+pub use super::cluster::directory::RegistryDirectory;
+pub use super::invocation::RpcInvocation;
 pub use super::invocation::{IntoStreamingRequest, Request, Response};
 pub use super::protocol::triple::triple_invoker::TripleInvoker;
 pub use super::protocol::Invoker;
+pub use super::registry::BoxRegistry;
+pub use super::registry::Registry;
+pub use super::registry::RegistryWrapper;
 pub use super::triple::client::TripleClient;
 pub use super::triple::codec::prost::ProstCodec;
 pub use super::triple::codec::Codec;
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 5a80e9a..423e9c7 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -189,3 +189,35 @@ impl Metadata {
         header
     }
 }
+
+pub trait Invocation {
+    fn get_target_service_unique_name(&self) -> String;
+    fn get_method_name(&self) -> String;
+}
+
+#[derive(Default)]
+pub struct RpcInvocation {
+    target_service_unique_name: String,
+    method_name: String,
+}
+
+impl RpcInvocation {
+    pub fn with_servie_unique_name(mut self, service_unique_name: String) -> 
Self {
+        self.target_service_unique_name = service_unique_name;
+        self
+    }
+    pub fn with_method_name(mut self, method_name: String) -> Self {
+        self.method_name = method_name;
+        self
+    }
+}
+
+impl Invocation for RpcInvocation {
+    fn get_target_service_unique_name(&self) -> String {
+        self.target_service_unique_name.clone()
+    }
+
+    fn get_method_name(&self) -> String {
+        self.method_name.clone()
+    }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 68bbea5..bf97ef7 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+pub mod cluster;
 pub mod codegen;
 pub mod common;
 pub mod context;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index 1cbe2ed..2c03fa7 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,13 +15,11 @@
  * limitations under the License.
  */
 
-use std::str::FromStr;
-
 use tower_service::Service;
 
 use crate::common::url::Url;
 use crate::protocol::Invoker;
-use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
+use crate::triple::client::builder::ClientBoxService;
 
 pub struct TripleInvoker {
     url: Url,
@@ -29,13 +27,13 @@ pub struct TripleInvoker {
 }
 
 impl TripleInvoker {
-    pub fn new(url: Url) -> TripleInvoker {
-        let uri = http::Uri::from_str(&url.to_url()).unwrap();
-        Self {
-            url,
-            conn: ClientBuilder::from(uri).connect(),
-        }
-    }
+    // pub fn new(url: Url) -> TripleInvoker {
+    //     let uri = http::Uri::from_str(&url.to_url()).unwrap();
+    //     Self {
+    //         url,
+    //         conn: ClientBuilder::from_uri(&uri).build()connect(),
+    //     }
+    // }
 }
 
 impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs 
b/dubbo/src/protocol/triple/triple_protocol.rs
index 037bfb7..4d56b7a 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -68,8 +68,9 @@ impl Protocol for TripleProtocol {
         Box::new(TripleExporter::new())
     }
 
-    async fn refer(self, url: Url) -> Self::Invoker {
-        TripleInvoker::new(url)
+    async fn refer(self, _url: Url) -> Self::Invoker {
+        todo!()
+        // TripleInvoker::new(url)
         // Self::Invoker
     }
 }
diff --git a/dubbo/src/registry/memory_registry.rs 
b/dubbo/src/registry/memory_registry.rs
index 67df5c7..4d0350e 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -20,6 +20,8 @@ use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::RwLock;
 
+use crate::common::url::Url;
+
 use super::{NotifyListener, Registry};
 
 // 从url中获取服务注册的元数据
@@ -100,11 +102,17 @@ impl Registry for MemoryRegistry {
     }
 }
 
-pub struct MemoryNotifyListener {}
+pub struct MemoryNotifyListener {
+    pub service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
 
 impl NotifyListener for MemoryNotifyListener {
     fn notify(&self, event: super::ServiceEvent) {
-        todo!()
+        let mut map = self.service_instances.write().expect("msg");
+        match event.action.as_str() {
+            "ADD" => map.insert(event.key, event.service),
+            &_ => todo!(),
+        };
     }
 
     fn notify_all(&self, event: super::ServiceEvent) {
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index cbae16b..8c56692 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -19,6 +19,8 @@
 pub mod memory_registry;
 pub mod protocol;
 
+use std::fmt::Debug;
+
 use crate::common::url::Url;
 
 pub trait Registry {
@@ -37,10 +39,27 @@ pub trait NotifyListener {
 }
 
 pub struct ServiceEvent {
-    key: String,
-    action: String,
-    service: Url,
+    pub key: String,
+    pub action: String,
+    pub service: Vec<Url>,
 }
 
 pub type BoxRegistry =
     Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> + 
Send + Sync>;
+
+#[derive(Default)]
+pub struct RegistryWrapper {
+    pub registry: Option<Box<dyn Registry<NotifyListener = 
memory_registry::MemoryNotifyListener>>>,
+}
+
+impl Clone for RegistryWrapper {
+    fn clone(&self) -> Self {
+        Self { registry: None }
+    }
+}
+
+impl Debug for RegistryWrapper {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RegistryWrapper").finish()
+    }
+}
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 933975c..bf6a8a5 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -20,7 +20,6 @@ use std::sync::{Arc, RwLock};
 
 use super::memory_registry::MemoryRegistry;
 use super::BoxRegistry;
-use crate::codegen::TripleInvoker;
 use crate::common::url::Url;
 use crate::protocol::triple::triple_exporter::TripleExporter;
 use crate::protocol::triple::triple_protocol::TripleProtocol;
@@ -104,6 +103,7 @@ impl Protocol for RegistryProtocol {
         // get Registry from registry_url
         // init directory based on registry_url and Registry
         // init Cluster based on Directory generates Invoker
-        Box::new(TripleInvoker::new(url))
+        todo!()
+        //Box::new(TripleInvoker::new(url))
     }
 }
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index 21afc0c..40a6ffb 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,35 +15,46 @@
  * limitations under the License.
  */
 
-use http::Uri;
-use hyper::client::conn::Builder;
-use tokio::time::Duration;
-use tower::ServiceBuilder;
-
-use crate::triple::transport::connection::Connection;
+use crate::cluster::directory::StaticDirectory;
+use crate::codegen::Directory;
+use crate::triple::compression::CompressionEncoding;
 use crate::utils::boxed::BoxService;
 
+use super::TripleClient;
+
 pub type ClientBoxService =
     BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>, 
crate::Error>;
 
 #[derive(Clone, Debug, Default)]
 pub struct ClientBuilder {
-    pub uri: Uri,
     pub timeout: Option<u64>,
     pub connector: &'static str,
+    directory: Option<Box<dyn Directory>>,
 }
 
 impl ClientBuilder {
     pub fn new() -> ClientBuilder {
         ClientBuilder {
-            uri: Uri::builder().build().unwrap(),
             timeout: None,
             connector: "",
+            directory: None,
+        }
+    }
+
+    pub fn from_static(host: &str) -> ClientBuilder {
+        Self {
+            timeout: None,
+            connector: "",
+            directory: Some(Box::new(StaticDirectory::new(&host))),
         }
     }
 
-    pub fn from_static(s: &'static str) -> ClientBuilder {
-        Self::from(Uri::from_static(s))
+    pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
+        Self {
+            timeout: None,
+            connector: "",
+            directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
+        }
     }
 
     pub fn with_timeout(self, timeout: u64) -> Self {
@@ -53,9 +64,17 @@ impl ClientBuilder {
         }
     }
 
+    /// host: http://0.0.0.0:8888
+    pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+        Self {
+            directory: Some(directory),
+            ..self
+        }
+    }
+
     pub fn with_host(self, host: &'static str) -> Self {
         Self {
-            uri: Uri::from_static(host),
+            directory: Some(Box::new(StaticDirectory::new(&host))),
             ..self
         }
     }
@@ -67,28 +86,10 @@ impl ClientBuilder {
         }
     }
 
-    pub fn connect(self) -> ClientBoxService {
-        let builder = ServiceBuilder::new();
-        let timeout = self.timeout.unwrap_or(5);
-        let builder = builder.timeout(Duration::from_secs(timeout));
-
-        let mut b = Builder::new();
-        let hyper_builder = b.http2_only(true);
-        let conn = Connection::new()
-            .with_host(self.uri.clone())
-            .with_connector(self.connector)
-            .with_builder(hyper_builder.to_owned());
-
-        BoxService::new(builder.service(conn))
-    }
-}
-
-impl From<Uri> for ClientBuilder {
-    fn from(u: Uri) -> Self {
-        Self {
-            uri: u,
-            timeout: None,
-            connector: "tcp",
+    pub fn build(self) -> TripleClient {
+        TripleClient {
+            send_compression_encoding: Some(CompressionEncoding::Gzip),
+            directory: self.directory,
         }
     }
 }
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index d4864ef..b9f6492 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -18,12 +18,15 @@
 use std::str::FromStr;
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
+
 use http::HeaderValue;
+use rand::prelude::SliceRandom;
 use tower_service::Service;
 
-use super::builder::{ClientBoxService, ClientBuilder};
-use crate::filter::service::FilterService;
-use crate::filter::Filter;
+use super::super::transport::connection::Connection;
+use super::builder::ClientBuilder;
+use crate::codegen::{Directory, RpcInvocation};
+
 use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
 use crate::triple::codec::Codec;
 use crate::triple::compression::CompressionEncoding;
@@ -31,77 +34,29 @@ use crate::triple::decode::Decoding;
 use crate::triple::encode::encode;
 
 #[derive(Debug, Clone, Default)]
-pub struct TripleClient<T> {
-    builder: Option<ClientBuilder>,
-    inner: T,
-    send_compression_encoding: Option<CompressionEncoding>,
+pub struct TripleClient {
+    pub(crate) send_compression_encoding: Option<CompressionEncoding>,
+    pub(crate) directory: Option<Box<dyn Directory>>,
 }
 
-impl TripleClient<ClientBoxService> {
+impl TripleClient {
     pub fn connect(host: String) -> Self {
-        let uri = match http::Uri::from_str(&host) {
-            Ok(v) => v,
-            Err(err) => {
-                tracing::error!("http uri parse error: {}, host: {}", err, 
host);
-                panic!("http uri parse error: {}, host: {}", err, host)
-            }
-        };
+        let builder = ClientBuilder::from_static(&host);
 
-        let builder = ClientBuilder::from(uri);
-
-        TripleClient {
-            builder: Some(builder.clone()),
-            inner: builder.connect(),
-            send_compression_encoding: Some(CompressionEncoding::Gzip),
-        }
+        builder.build()
     }
 
-    pub fn with_builder(builder: ClientBuilder) -> Self {
-        TripleClient {
-            builder: Some(builder.clone()),
-            inner: builder.connect(),
-            send_compression_encoding: Some(CompressionEncoding::Gzip),
-        }
+    pub fn new(builder: ClientBuilder) -> Self {
+        builder.build()
     }
-}
 
-impl<T> TripleClient<T> {
-    pub fn new(inner: T, builder: ClientBuilder) -> Self {
-        TripleClient {
-            builder: Some(builder),
-            inner,
-            send_compression_encoding: Some(CompressionEncoding::Gzip),
-        }
-    }
-
-    pub fn with_filter<F>(self, filter: F) -> TripleClient<FilterService<T, F>>
-    where
-        F: Filter,
-    {
-        TripleClient::new(
-            FilterService::new(self.inner, filter),
-            self.builder.unwrap(),
-        )
-    }
-}
-
-impl<T> TripleClient<T>
-where
-    T: Service<http::Request<hyper::Body>, Response = 
http::Response<crate::BoxBody>>,
-    T::Error: Into<crate::Error>,
-{
     fn map_request(
         &self,
+        uri: http::Uri,
         path: http::uri::PathAndQuery,
         body: hyper::Body,
     ) -> http::Request<hyper::Body> {
-        let mut parts = match self.builder.as_ref() {
-            Some(v) => v.to_owned().uri.into_parts(),
-            None => {
-                tracing::error!("client host is empty");
-                return http::Request::new(hyper::Body::empty());
-            }
-        };
+        let mut parts = uri.into_parts();
         parts.path_and_query = Some(path);
 
         let uri = http::Uri::from_parts(parts).unwrap();
@@ -127,7 +82,7 @@ where
         );
         req.headers_mut().insert(
             "content-type",
-            HeaderValue::from_static("application/grpc+json"),
+            HeaderValue::from_static("application/grpc+proto"),
         );
         req.headers_mut()
             .insert("user-agent", 
HeaderValue::from_static("dubbo-rust/0.1.0"));
@@ -172,6 +127,7 @@ where
         req: Request<M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
+        invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -187,10 +143,15 @@ where
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
 
-        let req = self.map_request(path, body);
+        let url_list = self.directory.as_ref().expect("msg").list(invocation);
+        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+
+        let req = self.map_request(http_uri.clone(), path, body);
 
-        let response = self
-            .inner
+        let mut conn = Connection::new().with_host(http_uri);
+        let response = conn
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
@@ -228,6 +189,7 @@ where
         req: impl IntoStreamingRequest<Message = M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
+        invocation: RpcInvocation,
     ) -> Result<Response<Decoding<M2>>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -243,10 +205,15 @@ where
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
 
-        let req = self.map_request(path, body);
+        let url_list = self.directory.as_ref().expect("msg").list(invocation);
+        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+
+        let req = self.map_request(http_uri.clone(), path, body);
 
-        let response = self
-            .inner
+        let mut conn = Connection::new().with_host(http_uri);
+        let response = conn
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
@@ -268,6 +235,7 @@ where
         req: impl IntoStreamingRequest<Message = M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
+        invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -283,10 +251,15 @@ where
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
 
-        let req = self.map_request(path, body);
+        let url_list = self.directory.as_ref().expect("msg").list(invocation);
+        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
 
-        let response = self
-            .inner
+        let req = self.map_request(http_uri.clone(), path, body);
+
+        let mut conn = Connection::new().with_host(http_uri);
+        let response = conn
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
@@ -324,6 +297,7 @@ where
         req: Request<M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
+        invocation: RpcInvocation,
     ) -> Result<Response<Decoding<M2>>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -339,10 +313,15 @@ where
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
 
-        let req = self.map_request(path, body);
+        let url_list = self.directory.as_ref().expect("msg").list(invocation);
+        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
+
+        let req = self.map_request(http_uri.clone(), path, body);
 
-        let response = self
-            .inner
+        let mut conn = Connection::new().with_host(http_uri);
+        let response = conn
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 9e794b0..f82ef80 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -23,8 +23,11 @@ prost = "0.10.4"
 async-trait = "0.1.56"
 tokio-stream = "0.1"
 
+hyper = { version = "0.14.19", features = ["full"]}
+
 dubbo = {path = "../../dubbo", version = "0.2.0"}
 dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = 
"0.2.0"}
 
 [build-dependencies]
 dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 060a17f..7312f72 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -31,10 +31,11 @@ impl Filter for FakeFilter {
 
 #[tokio::main]
 async fn main() {
-    let builder = ClientBuilder::new()
-        .with_connector("unix")
-        .with_host("unix://127.0.0.1:8888");
-    let mut cli = EchoClient::build(builder);
+    // let builder = ClientBuilder::new()
+    //     .with_connector("unix")
+    //     .with_host("unix://127.0.0.1:8888");
+    let builder = 
ClientBuilder::from_static(&"http://127.0.0.1:8888";).with_timeout(1000000);
+    let mut cli = EchoClient::new(builder);
     // let mut unary_cli = cli.clone().with_filter(FakeFilter {});
     // let mut cli = 
EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888";));
     let resp = cli
diff --git a/examples/echo/src/protos/hello_echo.rs 
b/examples/echo/src/echo/grpc.examples.echo.rs
similarity index 90%
copy from examples/echo/src/protos/hello_echo.rs
copy to examples/echo/src/echo/grpc.examples.echo.rs
index 08a1d29..9c487e6 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/echo/grpc.examples.echo.rs
@@ -33,37 +33,19 @@ pub mod echo_client {
     use dubbo::codegen::*;
     /// Echo is the echo service.
     #[derive(Debug, Clone, Default)]
-    pub struct EchoClient<T> {
-        inner: TripleClient<T>,
+    pub struct EchoClient {
+        inner: TripleClient,
     }
-    impl EchoClient<ClientBoxService> {
+    impl EchoClient {
         pub fn connect(host: String) -> Self {
             let cli = TripleClient::connect(host);
             EchoClient { inner: cli }
         }
-        pub fn build(builder: ClientBuilder) -> Self {
+        pub fn new(builder: ClientBuilder) -> Self {
             Self {
-                inner: TripleClient::with_builder(builder),
+                inner: TripleClient::new(builder),
             }
         }
-    }
-    impl<T> EchoClient<T>
-    where
-        T: Service<http::Request<hyperBody>, Response = 
http::Response<BoxBody>>,
-        T::Error: Into<StdError>,
-    {
-        pub fn new(inner: T, builder: ClientBuilder) -> Self {
-            Self {
-                inner: TripleClient::new(inner, builder),
-            }
-        }
-        pub fn with_filter<F>(self, filter: F) -> EchoClient<FilterService<T, 
F>>
-        where
-            F: Filter,
-        {
-            let inner = self.inner.with_filter(filter);
-            EchoClient { inner }
-        }
         /// UnaryEcho is unary echo.
         pub async fn unary_echo(
             &mut self,
@@ -71,8 +53,11 @@ pub mod echo_client {
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("UnaryEcho"));
             let path = 
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
-            self.inner.unary(request, codec, path).await
+            self.inner.unary(request, codec, path, invocation).await
         }
         /// ServerStreamingEcho is server side streaming.
         pub async fn server_streaming_echo(
@@ -81,10 +66,15 @@ pub mod echo_client {
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("ServerStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
             );
-            self.inner.server_streaming(request, codec, path).await
+            self.inner
+                .server_streaming(request, codec, path, invocation)
+                .await
         }
         /// ClientStreamingEcho is client side streaming.
         pub async fn client_streaming_echo(
@@ -93,10 +83,15 @@ pub mod echo_client {
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("ClientStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
             );
-            self.inner.client_streaming(request, codec, path).await
+            self.inner
+                .client_streaming(request, codec, path, invocation)
+                .await
         }
         /// BidirectionalStreamingEcho is bidi streaming.
         pub async fn bidirectional_streaming_echo(
@@ -105,10 +100,15 @@ pub mod echo_client {
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("BidirectionalStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
             );
-            self.inner.bidi_streaming(request, codec, path).await
+            self.inner
+                .bidi_streaming(request, codec, path, invocation)
+                .await
         }
     }
 }
diff --git a/examples/echo/src/protos/hello_echo.rs 
b/examples/echo/src/protos/hello_echo.rs
index 08a1d29..9c487e6 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/protos/hello_echo.rs
@@ -33,37 +33,19 @@ pub mod echo_client {
     use dubbo::codegen::*;
     /// Echo is the echo service.
     #[derive(Debug, Clone, Default)]
-    pub struct EchoClient<T> {
-        inner: TripleClient<T>,
+    pub struct EchoClient {
+        inner: TripleClient,
     }
-    impl EchoClient<ClientBoxService> {
+    impl EchoClient {
         pub fn connect(host: String) -> Self {
             let cli = TripleClient::connect(host);
             EchoClient { inner: cli }
         }
-        pub fn build(builder: ClientBuilder) -> Self {
+        pub fn new(builder: ClientBuilder) -> Self {
             Self {
-                inner: TripleClient::with_builder(builder),
+                inner: TripleClient::new(builder),
             }
         }
-    }
-    impl<T> EchoClient<T>
-    where
-        T: Service<http::Request<hyperBody>, Response = 
http::Response<BoxBody>>,
-        T::Error: Into<StdError>,
-    {
-        pub fn new(inner: T, builder: ClientBuilder) -> Self {
-            Self {
-                inner: TripleClient::new(inner, builder),
-            }
-        }
-        pub fn with_filter<F>(self, filter: F) -> EchoClient<FilterService<T, 
F>>
-        where
-            F: Filter,
-        {
-            let inner = self.inner.with_filter(filter);
-            EchoClient { inner }
-        }
         /// UnaryEcho is unary echo.
         pub async fn unary_echo(
             &mut self,
@@ -71,8 +53,11 @@ pub mod echo_client {
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("UnaryEcho"));
             let path = 
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
-            self.inner.unary(request, codec, path).await
+            self.inner.unary(request, codec, path, invocation).await
         }
         /// ServerStreamingEcho is server side streaming.
         pub async fn server_streaming_echo(
@@ -81,10 +66,15 @@ pub mod echo_client {
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("ServerStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
             );
-            self.inner.server_streaming(request, codec, path).await
+            self.inner
+                .server_streaming(request, codec, path, invocation)
+                .await
         }
         /// ClientStreamingEcho is client side streaming.
         pub async fn client_streaming_echo(
@@ -93,10 +83,15 @@ pub mod echo_client {
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("ClientStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
             );
-            self.inner.client_streaming(request, codec, path).await
+            self.inner
+                .client_streaming(request, codec, path, invocation)
+                .await
         }
         /// BidirectionalStreamingEcho is bidi streaming.
         pub async fn bidirectional_streaming_echo(
@@ -105,10 +100,15 @@ pub mod echo_client {
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
             let codec =
                 dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let invocation = RpcInvocation::default()
+                
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+                .with_method_name(String::from("BidirectionalStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
             );
-            self.inner.bidi_streaming(request, codec, path).await
+            self.inner
+                .bidi_streaming(request, codec, path, invocation)
+                .await
         }
     }
 }
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index b698bc7..c4b20cc 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -22,9 +22,12 @@ prost-derive = {version = "0.10", optional = true}
 prost = "0.10.4"
 async-trait = "0.1.56"
 tokio-stream = "0.1"
+tracing = "0.1"
+tracing-subscriber = "0.2.0"
 
 dubbo = {path = "../../dubbo", version = "0.2.0"}
 dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = 
"0.2.0"}
 
 [build-dependencies]
 dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/greeter/proto/greeter.proto 
b/examples/greeter/proto/greeter.proto
index 0d8be79..a0c466a 100644
--- a/examples/greeter/proto/greeter.proto
+++ b/examples/greeter/proto/greeter.proto
@@ -29,7 +29,7 @@ message GreeterReply {
   string message = 1;
 }
 
-service Greeter{
+service Greeter {
 
   // unary
   rpc greet(GreeterRequest) returns (GreeterReply);
diff --git a/examples/greeter/src/greeter/client.rs 
b/examples/greeter/src/greeter/client.rs
index 2004b6f..c493d60 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -23,10 +23,31 @@ pub mod protos {
 use dubbo::codegen::*;
 use futures_util::StreamExt;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
 
 #[tokio::main]
 async fn main() {
-    let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string());
+    // a builder for `FmtSubscriber`.
+    let subscriber = FmtSubscriber::builder()
+        // all spans/events with a level higher than TRACE (e.g, debug, info, 
warn, etc.)
+        // will be written to stdout.
+        .with_max_level(Level::INFO)
+        // completes the builder.
+        .finish();
+
+    tracing::subscriber::set_global_default(subscriber).expect("setting 
default subscriber failed");
+
+    let mut cli = 
GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888";));
+
+    // Here is example for zk
+    // let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+    //     Ok(val) => val,
+    //     Err(_) => "localhost:2181".to_string(),
+    // };
+    // let zkr = ZookeeperRegistry::new(&zk_connect_string);
+    // let directory = RegistryDirectory::new(Box::new(zkr));
+    // cli = cli.with_directory(Box::new(directory));
 
     println!("# unary call");
     let resp = cli
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
new file mode 100644
index 0000000..cf040fa
--- /dev/null
+++ b/registry-zookeeper/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "dubbo-registry-zookeeper"
+version = "0.2.0"
+edition = "2021"
+license = "Apache-2.0"
+
+# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+zookeeper = "0.6.1"
+dubbo = {path = "../dubbo/", version = "0.2.0"}
+serde_json = "1.0"
+serde = {version = "1.0.145",features = ["derive"]}
+tracing = "0.1"
diff --git a/registry-zookeeper/LICENSE b/registry-zookeeper/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/registry-zookeeper/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs
new file mode 100644
index 0000000..ccfce10
--- /dev/null
+++ b/registry-zookeeper/src/lib.rs
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod zookeeper_registry;
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn it_works() {
+        let result = 2 + 2;
+        assert_eq!(result, 4);
+    }
+}
diff --git a/registry-zookeeper/src/zookeeper_registry.rs 
b/registry-zookeeper/src/zookeeper_registry.rs
new file mode 100644
index 0000000..4c82634
--- /dev/null
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#![allow(unused_variables, dead_code, missing_docs)]
+
+use dubbo::common::url::Url;
+use dubbo::registry::memory_registry::MemoryNotifyListener;
+use dubbo::registry::NotifyListener;
+use dubbo::registry::Registry;
+use dubbo::registry::ServiceEvent;
+use dubbo::StdError;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::time::Duration;
+use tracing::info;
+
+use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+// extract service registry metadata from url
+/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+/// dubboPath = fmt.Sprintf("/%s/%s/%s", 
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), 
common.DubboNodes[common.PROVIDER])
+
+pub const REGISTRY_GROUP_KEY: &str = "registry.group";
+
+struct LoggingWatcher;
+impl Watcher for LoggingWatcher {
+    fn handle(&self, e: WatchedEvent) {
+        println!("{:?}", e)
+    }
+}
+
+//#[derive(Debug)]
+pub struct ZookeeperRegistry {
+    root_path: String,
+    zk_client: Arc<ZooKeeper>,
+
+    listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as 
Registry>::NotifyListener>>>,
+}
+
+pub struct MyNotifyListener {}
+
+impl NotifyListener for MyNotifyListener {
+    fn notify(&self, event: dubbo::registry::ServiceEvent) {
+        todo!()
+    }
+
+    fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
+        todo!()
+    }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ZkServiceInstance {
+    name: String,
+    address: String,
+    port: i32,
+}
+
+impl ZkServiceInstance {
+    pub fn get_service_name(&self) -> &str {
+        self.name.as_str()
+    }
+
+    pub fn get_host(&self) -> &str {
+        self.address.as_str()
+    }
+
+    pub fn get_port(&self) -> i32 {
+        self.port
+    }
+}
+
+impl ZookeeperRegistry {
+    pub fn new(connect_string: &str) -> ZookeeperRegistry {
+        let zk_client =
+            ZooKeeper::connect(connect_string, Duration::from_secs(15), 
LoggingWatcher).unwrap();
+        ZookeeperRegistry {
+            root_path: "/services".to_string(),
+            zk_client: Arc::new(zk_client),
+
+            listeners: RwLock::new(HashMap::new()),
+        }
+    }
+
+    fn create_listener(
+        &self,
+        path: String,
+        service_name: String,
+        listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>,
+    ) -> ServiceInstancesChangedListener {
+        let mut service_names = HashSet::new();
+        service_names.insert(service_name.clone());
+        return ServiceInstancesChangedListener {
+            zk_client: Arc::clone(&self.zk_client),
+            path: path,
+
+            service_name: service_name.clone(),
+            listener: listener,
+        };
+    }
+
+    fn get_app_name(&self, service_name: String) -> String {
+        let res = self
+            .zk_client
+            .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
+
+        let x = res.unwrap().0;
+        let s = match std::str::from_utf8(&x) {
+            Ok(v) => v,
+            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
+        };
+        s.to_string()
+    }
+}
+
+impl Registry for ZookeeperRegistry {
+    type NotifyListener = MemoryNotifyListener;
+
+    fn register(&mut self, url: Url) -> Result<(), StdError> {
+        todo!();
+    }
+
+    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+        todo!();
+    }
+
+    fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
+        let binding = url.get_service_name();
+        let service_name = binding.get(0).unwrap();
+        let app_name = self.get_app_name(service_name.clone());
+        let path = self.root_path.clone() + "/" + &app_name;
+        if self.listeners.read().unwrap().get(service_name).is_some() {
+            return Ok(());
+        }
+
+        let arc_listener = Arc::new(listener);
+        self.listeners
+            .write()
+            .unwrap()
+            .insert(service_name.to_string(), Arc::clone(&arc_listener));
+
+        let zk_listener = self.create_listener(
+            path.clone(),
+            service_name.to_string(),
+            Arc::clone(&arc_listener),
+        );
+
+        let res = self.zk_client.get_children_w(&path, zk_listener);
+        let result: Vec<Url> = res
+            .unwrap()
+            .iter()
+            .map(|node_key| {
+                let zk_res = self.zk_client.get_data(
+                    &(self.root_path.clone() + "/" + &app_name + "/" + 
&node_key),
+                    false,
+                );
+                let vec_u8 = zk_res.unwrap().0;
+                let sstr = std::str::from_utf8(&vec_u8).unwrap();
+                let instance: ZkServiceInstance = 
serde_json::from_str(sstr).unwrap();
+                let url = Url::from_url(&format!(
+                    "triple://{}:{}/{}",
+                    instance.get_host(),
+                    instance.get_port(),
+                    service_name
+                ))
+                .unwrap();
+                url
+            })
+            .collect();
+
+        info!("notifing {}->{:?}", service_name, result);
+        arc_listener.notify(ServiceEvent {
+            key: service_name.to_string(),
+            action: String::from("ADD"),
+            service: result,
+        });
+        Ok(())
+    }
+
+    fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
+        todo!()
+    }
+}
+
+pub struct ServiceInstancesChangedListener {
+    zk_client: Arc<ZooKeeper>,
+    path: String,
+
+    service_name: String,
+    listener: Arc<MemoryNotifyListener>,
+}
+
+impl Watcher for ServiceInstancesChangedListener {
+    fn handle(&self, event: WatchedEvent) {
+        if let (WatchedEventType::NodeChildrenChanged, Some(path)) = 
(event.event_type, event.path)
+        {
+            let event_path = path.clone();
+            let dirs = self
+                .zk_client
+                .get_children(&event_path.clone(), false)
+                .expect("msg");
+            let result: Vec<Url> = dirs
+                .iter()
+                .map(|node_key| {
+                    let zk_res = self
+                        .zk_client
+                        .get_data(&(event_path.clone() + "/" + node_key), 
false);
+                    let vec_u8 = zk_res.unwrap().0;
+                    let sstr = std::str::from_utf8(&vec_u8).unwrap();
+                    let instance: ZkServiceInstance = 
serde_json::from_str(sstr).unwrap();
+                    let url = Url::from_url(&format!(
+                        "triple://{}:{}/{}",
+                        instance.get_host(),
+                        instance.get_port(),
+                        self.service_name
+                    ))
+                    .unwrap();
+                    url
+                })
+                .collect();
+
+            let res = self.zk_client.get_children_w(
+                &path,
+                ServiceInstancesChangedListener {
+                    zk_client: Arc::clone(&self.zk_client),
+                    path: path.clone(),
+
+                    service_name: self.service_name.clone(),
+                    listener: Arc::clone(&self.listener),
+                },
+            );
+
+            info!("notifing {}->{:?}", self.service_name, result);
+            self.listener.notify(ServiceEvent {
+                key: self.service_name.clone(),
+                action: String::from("ADD"),
+                service: result,
+            });
+        }
+    }
+}
+
+impl NotifyListener for ServiceInstancesChangedListener {
+    fn notify(&self, event: ServiceEvent) {
+        todo!()
+    }
+
+    fn notify_all(&self, event: ServiceEvent) {
+        todo!()
+    }
+}
+
+#[test]
+fn it_works() {
+    let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
+    let zk_client =
+        ZooKeeper::connect(connect_string, Duration::from_secs(15), 
LoggingWatcher).unwrap();
+    let watcher = Arc::new(Some(TestZkWatcher {
+        watcher: Arc::new(None),
+    }));
+    watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
+    let x = watcher.as_ref().expect("");
+    zk_client.get_children_w("/test", x);
+    zk_client.delete("/test/a", None);
+    zk_client.delete("/test/b", None);
+    let zk_res = zk_client.create(
+        "/test/a",
+        vec![1, 3],
+        Acl::open_unsafe().clone(),
+        CreateMode::Ephemeral,
+    );
+    let zk_res = zk_client.create(
+        "/test/b",
+        vec![1, 3],
+        Acl::open_unsafe().clone(),
+        CreateMode::Ephemeral,
+    );
+    zk_client.close();
+}
+
+struct TestZkWatcher {
+    pub watcher: Arc<Option<TestZkWatcher>>,
+}
+
+impl Watcher for TestZkWatcher {
+    fn handle(&self, event: WatchedEvent) {
+        println!("event: {:?}", event);
+    }
+}

Reply via email to