This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch refact/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/refact/cluster by this push:
     new ab18d57  Rft: adapt nacos registry and zookeeper registry (#169)
ab18d57 is described below

commit ab18d5777ac1b84bfd513f8255893d5d9712a74f
Author: 毛文超 <[email protected]>
AuthorDate: Thu Dec 28 11:30:59 2023 +0800

    Rft: adapt nacos registry and zookeeper registry (#169)
    
    * Rft: adapt nacos registry and zookeeper registry
    
    Close #168
    
    * Rft: adapt static registry
    
    * Rft: cargo fmt
---
 dubbo/src/directory/mod.rs                        |  47 +-
 dubbo/src/framework.rs                            |  11 +-
 dubbo/src/registry/mod.rs                         |  79 ++--
 dubbo/src/registry/n_registry.rs                  | 133 +++++-
 dubbo/src/registry/protocol.rs                    |  31 +-
 dubbo/src/registry/types.rs                       |  47 +-
 dubbo/src/svc.rs                                  |   4 -
 dubbo/src/triple/client/builder.rs                |  18 +-
 examples/echo/src/generated/grpc.examples.echo.rs | 179 +++-----
 examples/greeter/src/greeter/client.rs            |  12 +-
 examples/greeter/src/greeter/server.rs            |   9 +-
 registry/nacos/Cargo.toml                         |   4 +-
 registry/nacos/src/lib.rs                         | 509 ++++++++++++----------
 registry/zookeeper/Cargo.toml                     |   5 +-
 registry/zookeeper/src/lib.rs                     | 413 +++++++++---------
 15 files changed, 773 insertions(+), 728 deletions(-)

diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
index 84900aa..f0d9ebe 100644
--- a/dubbo/src/directory/mod.rs
+++ b/dubbo/src/directory/mod.rs
@@ -31,6 +31,7 @@ use crate::{
     svc::NewService,
     StdError,
 };
+use dubbo_base::Url;
 use dubbo_logger::tracing::debug;
 use futures_core::ready;
 use futures_util::future;
@@ -162,7 +163,11 @@ where
         let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
 
         tokio::spawn(async move {
-            let receiver = registry.subscribe(service_name).await;
+            // todo use dubbo url model generate subscribe url
+            // category:serviceInterface:version:group
+            let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888", 
service_name);
+            let subscribe_url = Url::from_url(&consumer_url).unwrap();
+            let receiver = registry.subscribe(subscribe_url).await;
             debug!("discover start!");
             match receiver {
                 Err(_e) => {
@@ -217,22 +222,32 @@ where
     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
         loop {
             let pin_discover = Pin::new(&mut self.discover);
-            let change = ready!(pin_discover.poll_discover(cx))
-                .transpose()
-                .map_err(|e| e.into())?;
-            match change {
-                Some(Change::Remove(key)) => {
-                    debug!("remove key: {}", key);
-                    self.directory.remove(&key);
-                }
-                Some(Change::Insert(key, _)) => {
-                    debug!("insert key: {}", key);
-                    let invoker = self.new_invoker.new_service(key.clone());
-                    self.directory.insert(key, invoker);
+
+            match pin_discover.poll_discover(cx) {
+                Poll::Pending => {
+                    if self.directory.is_empty() {
+                        return Poll::Pending;
+                    } else {
+                        return Poll::Ready(Ok(()));
+                    }
                 }
-                None => {
-                    debug!("stream closed");
-                    return Poll::Ready(Ok(()));
+                Poll::Ready(change) => {
+                    let change = change.transpose().map_err(|e| e.into())?;
+                    match change {
+                        Some(Change::Remove(key)) => {
+                            debug!("remove key: {}", key);
+                            self.directory.remove(&key);
+                        }
+                        Some(Change::Insert(key, _)) => {
+                            debug!("insert key: {}", key);
+                            let invoker = 
self.new_invoker.new_service(key.clone());
+                            self.directory.insert(key, invoker);
+                        }
+                        None => {
+                            debug!("stream closed");
+                            return Poll::Ready(Ok(()));
+                        }
+                    }
                 }
             }
         }
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index d595f38..2666d25 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -25,9 +25,9 @@ use std::{
 use crate::{
     protocol::{BoxExporter, Protocol},
     registry::{
+        n_registry::{ArcRegistry, Registry},
         protocol::RegistryProtocol,
         types::{Registries, RegistriesOperation},
-        BoxRegistry, Registry,
     },
 };
 use dubbo_base::Url;
@@ -60,14 +60,14 @@ impl Dubbo {
         self
     }
 
-    pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) 
-> Self {
+    pub fn add_registry(mut self, registry_key: &str, registry: ArcRegistry) 
-> Self {
         if self.registries.is_none() {
             self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
         }
         self.registries
             .as_ref()
             .unwrap()
-            .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+            .insert(registry_key.to_string(), registry);
         self
     }
 
@@ -130,12 +130,13 @@ impl Dubbo {
                 async_vec.push(exporter);
                 //TODO multiple registry
                 if self.registries.is_some() {
-                    self.registries
+                    let _ = self
+                        .registries
                         .as_ref()
                         .unwrap()
                         .default_registry()
                         .register(url.clone())
-                        .unwrap();
+                        .await;
                 }
             }
         }
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index d8ff6ef..b82fda8 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -17,47 +17,46 @@
 
 #![allow(unused_variables, dead_code, missing_docs)]
 pub mod integration;
-pub mod memory_registry;
 pub mod n_registry;
 pub mod protocol;
 pub mod types;
 
-use std::{
-    fmt::{Debug, Formatter},
-    sync::Arc,
-};
-
-use dubbo_base::Url;
-
-pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 
'static>;
-pub trait Registry {
-    fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
-    fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
-
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), crate::StdError>;
-    fn unsubscribe(
-        &self,
-        url: Url,
-        listener: RegistryNotifyListener,
-    ) -> Result<(), crate::StdError>;
-}
-
-pub trait NotifyListener {
-    fn notify(&self, event: ServiceEvent);
-    fn notify_all(&self, event: ServiceEvent);
-}
-
-#[derive(Debug)]
-pub struct ServiceEvent {
-    pub key: String,
-    pub action: String,
-    pub service: Vec<Url>,
-}
-
-pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
-
-impl Debug for BoxRegistry {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.write_str("BoxRegistry")
-    }
-}
+// use std::{
+//     fmt::{Debug, Formatter},
+//     sync::Arc,
+// };
+
+// use dubbo_base::Url;
+
+// pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 
'static>;
+// pub trait Registry {
+//     fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
+//     fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
+
+//     fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), crate::StdError>;
+//     fn unsubscribe(
+//         &self,
+//         url: Url,
+//         listener: RegistryNotifyListener,
+//     ) -> Result<(), crate::StdError>;
+// }
+
+// pub trait NotifyListener {
+//     fn notify(&self, event: ServiceEvent);
+//     fn notify_all(&self, event: ServiceEvent);
+// }
+
+// #[derive(Debug)]
+// pub struct ServiceEvent {
+//     pub key: String,
+//     pub action: String,
+//     pub service: Vec<Url>,
+// }
+
+// pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
+
+// impl Debug for BoxRegistry {
+//     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+//         f.write_str("BoxRegistry")
+//     }
+// }
diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs
index 928c9b3..abcd56b 100644
--- a/dubbo/src/registry/n_registry.rs
+++ b/dubbo/src/registry/n_registry.rs
@@ -1,13 +1,22 @@
-use std::sync::Arc;
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+};
 
 use async_trait::async_trait;
 use dubbo_base::Url;
-use tokio::sync::mpsc::{channel, Receiver};
+use thiserror::Error;
+use tokio::sync::{
+    mpsc::{self, Receiver},
+    Mutex,
+};
 use tower::discover::Change;
 
 use crate::StdError;
 
-type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>;
+pub type ServiceChange = Change<String, ()>;
+pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
+pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
 
 #[async_trait]
 pub trait Registry {
@@ -15,8 +24,7 @@ pub trait Registry {
 
     async fn unregister(&self, url: Url) -> Result<(), StdError>;
 
-    // todo service_name change to url
-    async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError>;
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
 
     async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
 }
@@ -27,13 +35,19 @@ pub struct ArcRegistry {
 }
 
 pub enum RegistryComponent {
-    NacosRegistry,
+    NacosRegistry(ArcRegistry),
     ZookeeperRegistry,
     StaticRegistry(StaticRegistry),
 }
 
+pub struct StaticServiceValues {
+    listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
+    urls: HashSet<String>,
+}
+
+#[derive(Default)]
 pub struct StaticRegistry {
-    urls: Vec<Url>,
+    urls: Mutex<HashMap<String, StaticServiceValues>>,
 }
 
 impl ArcRegistry {
@@ -54,8 +68,8 @@ impl Registry for ArcRegistry {
         self.inner.unregister(url).await
     }
 
-    async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError> {
-        self.inner.subscribe(service_name).await
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+        self.inner.subscribe(url).await
     }
 
     async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
@@ -73,11 +87,11 @@ impl Registry for RegistryComponent {
         todo!()
     }
 
-    async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError> {
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
         match self {
-            RegistryComponent::NacosRegistry => todo!(),
+            RegistryComponent::NacosRegistry(registry) => 
registry.subscribe(url).await,
             RegistryComponent::ZookeeperRegistry => todo!(),
-            RegistryComponent::StaticRegistry(registry) => 
registry.subscribe(service_name).await,
+            RegistryComponent::StaticRegistry(registry) => 
registry.subscribe(url).await,
         }
     }
 
@@ -88,31 +102,102 @@ impl Registry for RegistryComponent {
 
 impl StaticRegistry {
     pub fn new(urls: Vec<Url>) -> Self {
-        Self { urls }
+        let mut map = HashMap::with_capacity(urls.len());
+
+        for url in urls {
+            let service_name = url.get_service_name();
+            let static_values = map
+                .entry(service_name)
+                .or_insert_with(|| StaticServiceValues {
+                    listeners: Vec::new(),
+                    urls: HashSet::new(),
+                });
+            let url = url.to_string();
+            static_values.urls.insert(url.clone());
+        }
+
+        Self {
+            urls: Mutex::new(map),
+        }
     }
 }
 
 #[async_trait]
 impl Registry for StaticRegistry {
     async fn register(&self, url: Url) -> Result<(), StdError> {
-        todo!()
+        let service_name = url.get_service_name();
+        let mut lock = self.urls.lock().await;
+
+        let static_values = lock
+            .entry(service_name)
+            .or_insert_with(|| StaticServiceValues {
+                listeners: Vec::new(),
+                urls: HashSet::new(),
+            });
+        let url = url.to_string();
+        static_values.urls.insert(url.clone());
+
+        static_values.listeners.retain(|listener| {
+            let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(), 
())));
+            ret.is_ok()
+        });
+
+        Ok(())
     }
 
     async fn unregister(&self, url: Url) -> Result<(), StdError> {
-        todo!()
-    }
-
-    async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError> {
-        let (tx, rx) = channel(self.urls.len());
-        for url in self.urls.iter() {
-            let change = Ok(Change::Insert(url.to_url(), ()));
-            tx.send(change).await?;
+        let service_name = url.get_service_name();
+        let mut lock = self.urls.lock().await;
+
+        match lock.get_mut(&service_name) {
+            None => Ok(()),
+            Some(static_values) => {
+                let url = url.to_string();
+                static_values.urls.remove(&url);
+                static_values.listeners.retain(|listener| {
+                    let ret = 
listener.try_send(Ok(ServiceChange::Remove(url.clone())));
+                    ret.is_ok()
+                });
+                if static_values.urls.is_empty() {
+                    lock.remove(&service_name);
+                }
+                Ok(())
+            }
         }
+    }
 
-        Ok(rx)
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+        let service_name = url.get_service_name();
+
+        let change_rx = {
+            let mut lock = self.urls.lock().await;
+            let static_values = lock
+                .entry(service_name)
+                .or_insert_with(|| StaticServiceValues {
+                    listeners: Vec::new(),
+                    urls: HashSet::new(),
+                });
+
+            let (tx, change_rx) = mpsc::channel(64);
+            static_values.listeners.push(tx);
+
+            for url in static_values.urls.iter() {
+                static_values.listeners.retain(|listener| {
+                    let ret = 
listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
+                    ret.is_ok()
+                });
+            }
+            change_rx
+        };
+
+        Ok(change_rx)
     }
 
     async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
-        todo!()
+        Ok(())
     }
 }
+
+#[derive(Error, Debug)]
+#[error("static registry error: {0}")]
+struct StaticRegistryError(String);
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 1250950..b9ba722 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -19,11 +19,10 @@ use dubbo_base::Url;
 use dubbo_logger::tracing;
 use std::{
     collections::HashMap,
-    fmt::{Debug, Formatter},
-    sync::{Arc, Mutex, RwLock},
+    sync::{Arc, RwLock},
 };
 
-use super::{memory_registry::MemoryRegistry, BoxRegistry};
+use super::n_registry::{ArcRegistry, Registry, StaticRegistry};
 use crate::{
     protocol::{
         triple::{triple_exporter::TripleExporter, 
triple_protocol::TripleProtocol},
@@ -42,19 +41,6 @@ pub struct RegistryProtocol {
     services: HashMap<String, Vec<Url>>,
 }
 
-impl Debug for RegistryProtocol {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.write_str(
-            format!(
-                "RegistryProtocol services{:#?},registries,{:#?}",
-                self.services,
-                self.registries.clone()
-            )
-            .as_str(),
-        )
-    }
-}
-
 impl RegistryProtocol {
     pub fn new() -> Self {
         RegistryProtocol {
@@ -74,16 +60,17 @@ impl RegistryProtocol {
         self
     }
 
-    pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
-        let mem = MemoryRegistry::default();
+    pub fn get_registry(&mut self, url: Url) -> ArcRegistry {
+        let mem = StaticRegistry::default();
+        let mem = ArcRegistry::new(mem);
         self.registries
             .as_ref()
             .unwrap()
             .lock()
             .unwrap()
-            .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
+            .insert(url.location, mem.clone());
 
-        Box::new(mem)
+        mem
     }
 }
 
@@ -105,8 +92,8 @@ impl Protocol for RegistryProtocol {
         if let Some(urls) = registry_url {
             for url in urls.clone().iter() {
                 if !url.service_key.is_empty() {
-                    let mut reg = self.get_registry(url.clone());
-                    reg.register(url.clone()).unwrap();
+                    let reg = self.get_registry(url.clone());
+                    let _ = reg.register(url.clone()).await;
                 }
             }
         }
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
index ae7c7ca..5c1687d 100644
--- a/dubbo/src/registry/types.rs
+++ b/dubbo/src/registry/types.rs
@@ -20,30 +20,22 @@ use std::{
     sync::{Arc, Mutex},
 };
 
-use dubbo_base::Url;
-use dubbo_logger::tracing::info;
 use itertools::Itertools;
 
-use crate::{
-    registry::{BoxRegistry, Registry},
-    StdError,
-};
-
-use super::RegistryNotifyListener;
+use super::n_registry::ArcRegistry;
 
-pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
-pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
+pub type Registries = Arc<Mutex<HashMap<String, ArcRegistry>>>;
 
 pub const DEFAULT_REGISTRY_KEY: &str = "default";
 
 pub trait RegistriesOperation {
-    fn get(&self, registry_key: &str) -> SafeRegistry;
-    fn insert(&self, registry_key: String, registry: SafeRegistry);
-    fn default_registry(&self) -> SafeRegistry;
+    fn get(&self, registry_key: &str) -> ArcRegistry;
+    fn insert(&self, registry_key: String, registry: ArcRegistry);
+    fn default_registry(&self) -> ArcRegistry;
 }
 
 impl RegistriesOperation for Registries {
-    fn get(&self, registry_key: &str) -> SafeRegistry {
+    fn get(&self, registry_key: &str) -> ArcRegistry {
         self.as_ref()
             .lock()
             .unwrap()
@@ -52,11 +44,11 @@ impl RegistriesOperation for Registries {
             .clone()
     }
 
-    fn insert(&self, registry_key: String, registry: SafeRegistry) {
+    fn insert(&self, registry_key: String, registry: ArcRegistry) {
         self.as_ref().lock().unwrap().insert(registry_key, registry);
     }
 
-    fn default_registry(&self) -> SafeRegistry {
+    fn default_registry(&self) -> ArcRegistry {
         let guard = self.as_ref().lock().unwrap();
         let (_, result) = guard
             .iter()
@@ -66,26 +58,3 @@ impl RegistriesOperation for Registries {
         result.clone()
     }
 }
-
-impl Registry for SafeRegistry {
-    fn register(&mut self, url: Url) -> Result<(), StdError> {
-        info!("register {}.", url);
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-
-    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), StdError> {
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-
-    fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), StdError> {
-        self.lock().unwrap().register(url).expect("registry err.");
-        Ok(())
-    }
-}
diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs
index df4c071..db59b92 100644
--- a/dubbo/src/svc.rs
+++ b/dubbo/src/svc.rs
@@ -1,9 +1,5 @@
 use std::{marker::PhantomData, sync::Arc};
 
-
-
-
-
 pub trait NewService<T> {
     type Service;
 
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index c4e6e62..0dadbcc 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -59,9 +59,10 @@ impl ClientBuilder {
         Self {
             timeout: None,
             connector: "",
-            registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(
-                StaticRegistry::new(vec![Url::from_url(host).unwrap()]),
-            ))),
+            registry: 
Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url(
+                host,
+            )
+            .unwrap()]))),
             direct: true,
             host: host.to_string(),
         }
@@ -74,18 +75,19 @@ impl ClientBuilder {
         }
     }
 
-    pub fn with_registry(self, registry: RegistryComponent) -> Self {
+    pub fn with_registry(self, registry: ArcRegistry) -> Self {
         Self {
-            registry: Some(ArcRegistry::new(registry)),
+            registry: Some(registry),
             ..self
         }
     }
 
     pub fn with_host(self, host: &'static str) -> Self {
         Self {
-            registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(
-                StaticRegistry::new(vec![Url::from_url(host).unwrap()]),
-            ))),
+            registry: 
Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url(
+                host,
+            )
+            .unwrap()]))),
             ..self
         }
     }
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs 
b/examples/echo/src/generated/grpc.examples.echo.rs
index 07c58fe..4521c9f 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -36,16 +36,12 @@ pub mod echo_client {
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("UnaryEcho"));
-            let path = http::uri::PathAndQuery::from_static(
-                "/grpc.examples.echo.Echo/UnaryEcho",
-            );
+            let path = 
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
             self.inner.unary(request, codec, path, invocation).await
         }
         /// ServerStreamingEcho is server side streaming.
@@ -53,51 +49,51 @@ pub mod echo_client {
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ServerStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
             );
-            self.inner.server_streaming(request, codec, path, invocation).await
+            self.inner
+                .server_streaming(request, codec, path, invocation)
+                .await
         }
         /// ClientStreamingEcho is client side streaming.
         pub async fn client_streaming_echo(
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ClientStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
             );
-            self.inner.client_streaming(request, codec, path, invocation).await
+            self.inner
+                .client_streaming(request, codec, path, invocation)
+                .await
         }
         /// BidirectionalStreamingEcho is bidi streaming.
         pub async fn bidirectional_streaming_echo(
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, 
dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("BidirectionalStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
             );
-            self.inner.bidi_streaming(request, codec, path, invocation).await
+            self.inner
+                .bidi_streaming(request, codec, path, invocation)
+                .await
         }
     }
 }
@@ -114,9 +110,7 @@ pub mod echo_server {
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the ServerStreamingEcho method.
-        type ServerStreamingEchoStream: futures_util::Stream<
-                Item = Result<super::EchoResponse, dubbo::status::Status>,
-            >
+        type ServerStreamingEchoStream: futures_util::Stream<Item = 
Result<super::EchoResponse, dubbo::status::Status>>
             + Send
             + 'static;
         /// ServerStreamingEcho is server side streaming.
@@ -130,19 +124,14 @@ pub mod echo_server {
             request: Request<Decoding<super::EchoRequest>>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the BidirectionalStreamingEcho 
method.
-        type BidirectionalStreamingEchoStream: futures_util::Stream<
-                Item = Result<super::EchoResponse, dubbo::status::Status>,
-            >
+        type BidirectionalStreamingEchoStream: futures_util::Stream<Item = 
Result<super::EchoResponse, dubbo::status::Status>>
             + Send
             + 'static;
         /// BidirectionalStreamingEcho is bidi streaming.
         async fn bidirectional_streaming_echo(
             &self,
             request: Request<Decoding<super::EchoRequest>>,
-        ) -> Result<
-            Response<Self::BidirectionalStreamingEchoStream>,
-            dubbo::status::Status,
-        >;
+        ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, 
dubbo::status::Status>;
     }
     /// Echo is the echo service.
     #[derive(Debug)]
@@ -172,10 +161,7 @@ pub mod echo_server {
         type Response = http::Response<BoxBody>;
         type Error = std::convert::Infallible;
         type Future = BoxFuture<Self::Response, Self::Error>;
-        fn poll_ready(
-            &mut self,
-            _cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
+        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -188,26 +174,18 @@ pub mod echo_server {
                     }
                     impl<T: Echo> UnarySvc<super::EchoRequest> for 
UnaryEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<
-                            Response<Self::Response>,
-                            dubbo::status::Status,
-                        >;
-                        fn call(
-                            &mut self,
-                            request: Request<super::EchoRequest>,
-                        ) -> Self::Future {
+                        type Future = BoxFuture<Response<Self::Response>, 
dubbo::status::Status>;
+                        fn call(&mut self, request: 
Request<super::EchoRequest>) -> Self::Future {
                             let inner = self.inner.0.clone();
                             let fut = async move { 
inner.unary_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server.unary(UnaryEchoServer { inner }, 
req).await;
                         Ok(res)
                     };
@@ -218,32 +196,22 @@ pub mod echo_server {
                     struct ServerStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
-                    for ServerStreamingEchoServer<T> {
+                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for 
ServerStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = T::ServerStreamingEchoStream;
-                        type Future = BoxFuture<
-                            Response<Self::ResponseStream>,
-                            dubbo::status::Status,
-                        >;
-                        fn call(
-                            &mut self,
-                            request: Request<super::EchoRequest>,
-                        ) -> Self::Future {
+                        type Future =
+                            BoxFuture<Response<Self::ResponseStream>, 
dubbo::status::Status>;
+                        fn call(&mut self, request: 
Request<super::EchoRequest>) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.server_streaming_echo(request).await
-                            };
+                            let fut = async move { 
inner.server_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
                             .server_streaming(ServerStreamingEchoServer { 
inner }, req)
                             .await;
@@ -256,31 +224,23 @@ pub mod echo_server {
                     struct ClientStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
-                    for ClientStreamingEchoServer<T> {
+                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for 
ClientStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<
-                            Response<Self::Response>,
-                            dubbo::status::Status,
-                        >;
+                        type Future = BoxFuture<Response<Self::Response>, 
dubbo::status::Status>;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.client_streaming_echo(request).await
-                            };
+                            let fut = async move { 
inner.client_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
                             .client_streaming(ClientStreamingEchoServer { 
inner }, req)
                             .await;
@@ -293,56 +253,41 @@ pub mod echo_server {
                     struct BidirectionalStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> StreamingSvc<super::EchoRequest>
-                    for BidirectionalStreamingEchoServer<T> {
+                    impl<T: Echo> StreamingSvc<super::EchoRequest> for 
BidirectionalStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = 
T::BidirectionalStreamingEchoStream;
-                        type Future = BoxFuture<
-                            Response<Self::ResponseStream>,
-                            dubbo::status::Status,
-                        >;
+                        type Future =
+                            BoxFuture<Response<Self::ResponseStream>, 
dubbo::status::Status>;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                
inner.bidirectional_streaming_echo(request).await
-                            };
+                            let fut =
+                                async move { 
inner.bidirectional_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = 
TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
-                            .bidi_streaming(
-                                BidirectionalStreamingEchoServer {
-                                    inner,
-                                },
-                                req,
-                            )
+                            .bidi_streaming(BidirectionalStreamingEchoServer { 
inner }, req)
                             .await;
                         Ok(res)
                     };
                     Box::pin(fut)
                 }
-                _ => {
-                    Box::pin(async move {
-                        Ok(
-                            http::Response::builder()
-                                .status(200)
-                                .header("grpc-status", "12")
-                                .header("content-type", "application/grpc")
-                                .body(empty_body())
-                                .unwrap(),
-                        )
-                    })
-                }
+                _ => Box::pin(async move {
+                    Ok(http::Response::builder()
+                        .status(200)
+                        .header("grpc-status", "12")
+                        .header("content-type", "application/grpc")
+                        .body(empty_body())
+                        .unwrap())
+                }),
             }
         }
     }
diff --git a/examples/greeter/src/greeter/client.rs 
b/examples/greeter/src/greeter/client.rs
index afa1b20..cb92ed0 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -21,17 +21,21 @@ pub mod protos {
 }
 
 use std::env;
- 
-use dubbo::codegen::*;
 
+use dubbo::{codegen::*, registry::n_registry::ArcRegistry};
+
+use dubbo_base::Url;
 use futures_util::StreamExt;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
+use registry_nacos::NacosRegistry;
 
 #[tokio::main]
 async fn main() {
     dubbo_logger::init();
- 
-    let builder = ClientBuilder::new().with_host("http://127.0.0.1:8888";);
+
+    let builder = 
ClientBuilder::new().with_registry(ArcRegistry::new(NacosRegistry::new(
+        Url::from_url("nacos://127.0.0.1:8848").unwrap(),
+    )));
 
     let mut cli = GreeterClient::new(builder);
 
diff --git a/examples/greeter/src/greeter/server.rs 
b/examples/greeter/src/greeter/server.rs
index c3bc4c5..fd436e5 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -18,11 +18,13 @@
 use std::{io::ErrorKind, pin::Pin};
 
 use async_trait::async_trait;
+use dubbo_base::Url;
 use futures_util::{Stream, StreamExt};
+use registry_nacos::NacosRegistry;
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
 
-use dubbo::{codegen::*, Dubbo, registry::memory_registry::MemoryRegistry};
+use dubbo::{codegen::*, registry::n_registry::ArcRegistry, Dubbo};
 use dubbo_config::RootConfig;
 use dubbo_logger::{
     tracing::{info, span},
@@ -56,10 +58,11 @@ async fn main() {
         Ok(config) => config,
         Err(_err) => panic!("err: {:?}", _err), // response was droped
     };
+
+    let nacos_registry = 
NacosRegistry::new(Url::from_url("nacos://127.0.0.1:8848").unwrap());
     let mut f = Dubbo::new()
         .with_config(r)
-        .add_registry("memory_registry", Box::new(MemoryRegistry::new()));
-
+        .add_registry("nacos-registry", ArcRegistry::new(nacos_registry));
 
     f.start().await;
 }
diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml
index 798253a..1fa6c19 100644
--- a/registry/nacos/Cargo.toml
+++ b/registry/nacos/Cargo.toml
@@ -9,13 +9,15 @@ repository = "https://github.com/apache/dubbo-rust.git";
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http"] }
+nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http", "async"] }
 dubbo.workspace = true
 serde_json.workspace = true
 serde = { workspace = true, features = ["derive"] }
 anyhow.workspace = true
 dubbo-logger.workspace = true
 dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
 
 [dev-dependencies]
 tracing-subscriber = "0.3.16"
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index bbbaaa2..ad34237 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -16,16 +16,20 @@
  */
 mod utils;
 
+use async_trait::async_trait;
 use dubbo_base::Url;
-use std::{
-    collections::{HashMap, HashSet},
-    sync::{Arc, Mutex},
-};
+use std::{collections::HashMap, sync::Arc};
+use tokio::{select, sync::mpsc};
 
 use anyhow::anyhow;
-use dubbo::registry::{NotifyListener, Registry, RegistryNotifyListener, 
ServiceEvent};
-use dubbo_logger::tracing::{error, info, warn};
-use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, 
ServiceInstance};
+use dubbo::{
+    registry::n_registry::{DiscoverStream, Registry, ServiceChange},
+    StdError,
+};
+use dubbo_logger::tracing::{debug, error, info};
+use nacos_sdk::api::naming::{
+    NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance,
+};
 
 use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str, 
match_range};
 
@@ -60,7 +64,6 @@ const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
 
 pub struct NacosRegistry {
     nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
-    listeners: Mutex<HashMap<String, HashSet<Arc<NotifyListenerWrapper>>>>,
 }
 
 impl NacosRegistry {
@@ -77,49 +80,6 @@ impl NacosRegistry {
 
         Self {
             nacos_naming_service: Arc::new(nacos_naming_service),
-            listeners: Mutex::new(HashMap::new()),
-        }
-    }
-
-    #[allow(dead_code)]
-    fn get_subscribe_service_names(&self, service_name: &NacosServiceName) -> 
HashSet<String> {
-        if service_name.is_concrete() {
-            let mut set = HashSet::new();
-            let service_subscribe_name = service_name.to_subscriber_str();
-            let service_subscriber_legacy_name = 
service_name.to_subscriber_legacy_string();
-            if service_subscribe_name.eq(&service_subscriber_legacy_name) {
-                set.insert(service_subscribe_name);
-            } else {
-                set.insert(service_subscribe_name);
-                set.insert(service_subscriber_legacy_name);
-            }
-
-            set
-        } else {
-            let list_view = self.nacos_naming_service.get_service_list(
-                1,
-                i32::MAX,
-                Some(
-                    service_name
-                        .get_group_with_default(DEFAULT_GROUP)
-                        .to_string(),
-                ),
-            );
-            if let Err(e) = list_view {
-                error!("list service instances occur an error: {:?}", e);
-                return HashSet::default();
-            }
-
-            let list_view = list_view.unwrap();
-            let set: HashSet<String> = list_view
-                .0
-                .into_iter()
-                .filter(|service_name| 
service_name.split(SERVICE_NAME_SEPARATOR).count() == 4)
-                .map(|service_name| 
NacosServiceName::from_service_name_str(&service_name))
-                .filter(|other_service_name| 
service_name.is_compatible(other_service_name))
-                .map(|service_name| service_name.to_subscriber_str())
-                .collect();
-            set
         }
     }
 }
@@ -135,20 +95,66 @@ impl NacosRegistry {
             ..Default::default()
         }
     }
+
+    fn diff<'a>(
+        old_service: &'a Vec<ServiceInstance>,
+        new_services: &'a Vec<ServiceInstance>,
+    ) -> (Vec<&'a ServiceInstance>, Vec<&'a ServiceInstance>) {
+        let new_hosts_map: HashMap<String, &ServiceInstance> = new_services
+            .iter()
+            .map(|hosts| (hosts.ip_and_port(), hosts))
+            .collect();
+
+        let old_hosts_map: HashMap<String, &ServiceInstance> = old_service
+            .iter()
+            .map(|hosts| (hosts.ip_and_port(), hosts))
+            .collect();
+
+        let mut add_hosts = Vec::<&ServiceInstance>::new();
+        let mut removed_hosts = Vec::<&ServiceInstance>::new();
+
+        for (key, new_host) in new_hosts_map.iter() {
+            let old_host = old_hosts_map.get(key);
+            match old_host {
+                None => {
+                    add_hosts.push(*new_host);
+                }
+                Some(old_host) => {
+                    if !old_host.is_same_instance(new_host) {
+                        removed_hosts.push(*old_host);
+                        add_hosts.push(*new_host);
+                    }
+                }
+            }
+        }
+
+        for (key, old_host) in old_hosts_map.iter() {
+            let new_host = new_hosts_map.get(key);
+            match new_host {
+                None => {
+                    removed_hosts.push(*old_host);
+                }
+                Some(_) => {}
+            }
+        }
+
+        (removed_hosts, add_hosts)
+    }
 }
 
+#[async_trait]
 impl Registry for NacosRegistry {
-    fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
-        let side = url.get_param(SIDE_KEY).unwrap_or_default();
-        let register_consumer = url
-            .get_param(REGISTER_CONSUMER_URL_KEY)
-            .unwrap_or_else(|| false.to_string())
-            .parse::<bool>()
-            .unwrap_or(false);
-        if side.ne(PROVIDER_SIDE) && !register_consumer {
-            warn!("Please set 
'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url 
registration.");
-            return Ok(());
-        }
+    async fn register(&self, url: Url) -> Result<(), dubbo::StdError> {
+        // let side = url.get_param(SIDE_KEY).unwrap_or_default();
+        // let register_consumer = url
+        //     .get_param(REGISTER_CONSUMER_URL_KEY)
+        //     .unwrap_or_else(|| false.to_string())
+        //     .parse::<bool>()
+        //     .unwrap_or(false);
+        // if side.ne(PROVIDER_SIDE) && !register_consumer {
+        //     warn!("Please set 
'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url 
registration.");
+        //     return Ok(());
+        // }
 
         let nacos_service_name = NacosServiceName::new(&url);
 
@@ -162,11 +168,10 @@ impl Registry for NacosRegistry {
         let nacos_service_instance = Self::create_nacos_service_instance(url);
 
         info!("register service: {}", nacos_service_name);
-        let ret = self.nacos_naming_service.register_instance(
-            nacos_service_name,
-            group_name,
-            nacos_service_instance,
-        );
+        let ret = self
+            .nacos_naming_service
+            .register_instance(nacos_service_name, group_name, 
nacos_service_instance)
+            .await;
         if let Err(e) = ret {
             error!("register to nacos occur an error: {:?}", e);
             return Err(anyhow!("register to nacos occur an error: {:?}", 
e).into());
@@ -175,7 +180,7 @@ impl Registry for NacosRegistry {
         Ok(())
     }
 
-    fn unregister(&mut self, url: Url) -> Result<(), dubbo::StdError> {
+    async fn unregister(&self, url: Url) -> Result<(), dubbo::StdError> {
         let nacos_service_name = NacosServiceName::new(&url);
 
         let group_name = Some(
@@ -189,11 +194,10 @@ impl Registry for NacosRegistry {
 
         info!("deregister service: {}", nacos_service_name);
 
-        let ret = self.nacos_naming_service.deregister_instance(
-            nacos_service_name,
-            group_name,
-            nacos_service_instance,
-        );
+        let ret = self
+            .nacos_naming_service
+            .deregister_instance(nacos_service_name, group_name, 
nacos_service_instance)
+            .await;
         if let Err(e) = ret {
             error!("deregister service from nacos occur an error: {:?}", e);
             return Err(anyhow!("deregister service from nacos occur an error: 
{:?}", e).into());
@@ -201,101 +205,161 @@ impl Registry for NacosRegistry {
         Ok(())
     }
 
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), dubbo::StdError> {
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
         let service_name = NacosServiceName::new(&url);
-        let url_str = url.to_url();
-
-        info!("subscribe: {}", &url_str);
+        let service_group = service_name
+            .get_group_with_default(DEFAULT_GROUP)
+            .to_string();
+        let subscriber_url = service_name.to_subscriber_str();
+        info!("subscribe: {}", subscriber_url);
+
+        let (listener, mut change_receiver) = ServiceChangeListener::new();
+        let arc_listener = Arc::new(listener);
+
+        let (discover_tx, discover_rx) = mpsc::channel(64);
+
+        let nacos_naming_service = self.nacos_naming_service.clone();
+
+        let listener_in_task = arc_listener.clone();
+        let service_group_in_task = service_group.clone();
+        let subscriber_url_in_task = subscriber_url.clone();
+        tokio::spawn(async move {
+            let listener = listener_in_task;
+            let service_group = service_group_in_task;
+            let subscriber_url = subscriber_url_in_task;
+
+            let mut current_instances = Vec::new();
+            loop {
+                let change = select! {
+                    _ = discover_tx.closed() => {
+                        debug!("service {} change task quit, unsubscribe.", 
subscriber_url);
+                        None
+                    },
+                    change = change_receiver.recv() => change
+                };
+
+                match change {
+                    Some(instances) => {
+                        debug!("service {} changed", subscriber_url);
+                        let (remove_instances, add_instances) =
+                            NacosRegistry::diff(&current_instances, 
&instances);
+
+                        for instance in remove_instances {
+                            let service_name = instance.service_name.as_ref();
+                            let url = match service_name {
+                                None => {
+                                    format!("triple://{}:{}", instance.ip(), 
instance.port())
+                                }
+                                Some(service_name) => {
+                                    format!(
+                                        "triple://{}:{}/{}",
+                                        instance.ip(),
+                                        instance.port(),
+                                        service_name
+                                    )
+                                }
+                            };
+
+                            match 
discover_tx.send(Ok(ServiceChange::Remove(url))).await {
+                                Ok(_) => {}
+                                Err(e) => {
+                                    error!(
+                                        "send service change failed: {:?}, 
maybe user unsubscribe",
+                                        e
+                                    );
+                                    break;
+                                }
+                            }
+                        }
+
+                        for instance in add_instances {
+                            let service_name = instance.service_name.as_ref();
+                            let url = match service_name {
+                                None => {
+                                    format!("triple://{}:{}", instance.ip(), 
instance.port())
+                                }
+                                Some(service_name) => {
+                                    format!(
+                                        "triple://{}:{}/{}",
+                                        instance.ip(),
+                                        instance.port(),
+                                        service_name
+                                    )
+                                }
+                            };
+
+                            match 
discover_tx.send(Ok(ServiceChange::Insert(url, ()))).await {
+                                Ok(_) => {}
+                                Err(e) => {
+                                    error!(
+                                        "send service change failed: {:?}, 
maybe user unsubscribe",
+                                        e
+                                    );
+                                    break;
+                                }
+                            }
+                        }
+                        current_instances = instances;
+                    }
+                    None => {
+                        error!(
+                            "receive service change task quit, unsubscribe 
{}.",
+                            subscriber_url
+                        );
+                        break;
+                    }
+                }
+            }
 
-        let nacos_listener: Arc<NotifyListenerWrapper> = {
-            let listeners = self.listeners.lock();
-            if let Err(e) = listeners {
+            debug!("unsubscribe service: {}", subscriber_url);
+            // unsubscribe
+            let unsubscribe = nacos_naming_service
+                .unsubscribe(subscriber_url, Some(service_group), Vec::new(), 
listener)
+                .await;
+
+            match unsubscribe {
+                Ok(_) => {}
+                Err(e) => {
+                    error!("unsubscribe service failed: {:?}", e);
+                }
+            }
+        });
+
+        let all_instance = self
+            .nacos_naming_service
+            .get_all_instances(
+                subscriber_url.clone(),
+                Some(service_group.clone()),
+                Vec::new(),
+                false,
+            )
+            .await?;
+        let _ = arc_listener.changed(all_instance);
+
+        match self
+            .nacos_naming_service
+            .subscribe(
+                subscriber_url.clone(),
+                Some(service_group.clone()),
+                Vec::new(),
+                arc_listener,
+            )
+            .await
+        {
+            Ok(_) => {}
+            Err(e) => {
                 error!("subscribe service failed: {:?}", e);
                 return Err(anyhow!("subscribe service failed: {:?}", 
e).into());
             }
-
-            let mut listeners = listeners.unwrap();
-            let listener_set = listeners.get_mut(url_str.as_str());
-
-            let wrapper = Arc::new(NotifyListenerWrapper(listener));
-            if let Some(listener_set) = listener_set {
-                listener_set.insert(wrapper.clone());
-            } else {
-                let mut hash_set = HashSet::new();
-                hash_set.insert(wrapper.clone());
-                listeners.insert(url_str, hash_set);
-            }
-
-            wrapper
-        };
-
-        let ret = self.nacos_naming_service.subscribe(
-            service_name.to_subscriber_str(),
-            Some(
-                service_name
-                    .get_group_with_default(DEFAULT_GROUP)
-                    .to_string(),
-            ),
-            Vec::new(),
-            nacos_listener,
-        );
-
-        if let Err(e) = ret {
-            error!("subscribe service failed: {:?}", e);
-            return Err(anyhow!("subscribe service failed: {:?}", e).into());
         }
 
-        Ok(())
+        Ok(discover_rx)
     }
 
-    fn unsubscribe(
-        &self,
-        url: Url,
-        listener: RegistryNotifyListener,
-    ) -> Result<(), dubbo::StdError> {
+    async fn unsubscribe(&self, url: Url) -> Result<(), dubbo::StdError> {
         let service_name = NacosServiceName::new(&url);
-        let url_str = url.to_url();
-        info!("unsubscribe: {}", &url_str);
-
-        let nacos_listener: Arc<NotifyListenerWrapper> = {
-            let listeners = self.listeners.lock();
-            if let Err(e) = listeners {
-                error!("unsubscribe service failed: {:?}", e);
-                return Err(anyhow!("unsubscribe service failed: {:?}", 
e).into());
-            }
-
-            let mut listeners = listeners.unwrap();
-            let listener_set = listeners.get_mut(url_str.as_str());
-            if listener_set.is_none() {
-                return Ok(());
-            }
-
-            let listener_set = listener_set.unwrap();
-
-            let listener = Arc::new(NotifyListenerWrapper(listener));
-            let listener = listener_set.take(&listener);
-            if listener.is_none() {
-                return Ok(());
-            }
-
-            listener.unwrap()
-        };
-
-        let ret = self.nacos_naming_service.unsubscribe(
-            service_name.to_subscriber_str(),
-            Some(
-                service_name
-                    .get_group_with_default(DEFAULT_GROUP)
-                    .to_string(),
-            ),
-            Vec::new(),
-            nacos_listener,
-        );
-
-        if let Err(e) = ret {
-            error!("unsubscribe service failed: {:?}", e);
-            return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
-        }
+        let subscriber_url = service_name.to_subscriber_str();
+        info!("unsubscribe: {}", &subscriber_url);
 
         Ok(())
     }
@@ -484,52 +548,43 @@ impl NacosServiceName {
     }
 }
 
-struct NotifyListenerWrapper(Arc<dyn NotifyListener + Sync + Send + 'static>);
-
-impl std::hash::Hash for NotifyListenerWrapper {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        let ptr = self.0.as_ref();
-        std::ptr::hash(ptr, state);
-    }
+struct ServiceChangeListener {
+    tx: mpsc::Sender<Vec<ServiceInstance>>,
 }
 
-impl PartialEq for NotifyListenerWrapper {
-    fn eq(&self, other: &Self) -> bool {
-        let self_ptr = self.0.as_ref() as *const dyn NotifyListener;
-        let other_ptr = other.0.as_ref() as *const dyn NotifyListener;
+impl ServiceChangeListener {
+    pub fn new() -> (Self, mpsc::Receiver<Vec<ServiceInstance>>) {
+        let (tx, rx) = mpsc::channel(64);
+        let this = Self { tx };
 
-        let (self_data_ptr, _): (*const u8, *const u8) = unsafe { 
std::mem::transmute(self_ptr) };
+        (this, rx)
+    }
 
-        let (other_data_ptr, _): (*const u8, *const u8) = unsafe { 
std::mem::transmute(other_ptr) };
-        self_data_ptr == other_data_ptr
+    pub fn changed(&self, instances: Vec<ServiceInstance>) -> Result<(), 
dubbo::StdError> {
+        match self.tx.try_send(instances) {
+            Ok(_) => Ok(()),
+            Err(e) => {
+                error!("send service change failed: {:?}", e);
+                Err(anyhow!("send service change failed: {:?}", e).into())
+            }
+        }
     }
 }
 
-impl Eq for NotifyListenerWrapper {}
-
-impl nacos_sdk::api::naming::NamingEventListener for NotifyListenerWrapper {
+impl NamingEventListener for ServiceChangeListener {
     fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
-        let service_name = event.service_name.clone();
+        debug!("service change {}", event.service_name.clone());
+        debug!("nacos event: {:?}", event);
+
         let instances = event.instances.as_ref();
-        let urls: Vec<Url>;
-        if let Some(instances) = instances {
-            urls = instances
-                .iter()
-                .filter_map(|data| {
-                    let url_str =
-                        format!("triple://{}:{}/{}", data.ip(), data.port(), 
service_name);
-                    Url::from_url(&url_str)
-                })
-                .collect();
-        } else {
-            urls = Vec::new();
+        match instances {
+            None => {
+                let _ = self.changed(Vec::default());
+            }
+            Some(instances) => {
+                let _ = self.changed(instances.clone());
+            }
         }
-        let notify_event = ServiceEvent {
-            key: service_name,
-            action: String::from("CHANGE"),
-            service: urls,
-        };
-        self.0.notify(notify_event);
     }
 }
 
@@ -543,9 +598,9 @@ pub mod tests {
 
     use super::*;
 
-    #[test]
+    #[tokio::test]
     #[ignore]
-    pub fn test_register_to_nacos() {
+    pub async fn test_register_to_nacos() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -556,14 +611,14 @@ pub mod tests {
             .init();
 
         let nacos_registry_url = 
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let registry = NacosRegistry::new(nacos_registry_url);
 
         let mut service_url = 
Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
         service_url
             .params
             .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
 
-        let ret = registry.register(service_url);
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
@@ -571,9 +626,9 @@ pub mod tests {
         thread::sleep(sleep_millis);
     }
 
-    #[test]
+    #[tokio::test]
     #[ignore]
-    pub fn test_register_and_unregister() {
+    pub async fn test_register_and_unregister() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -584,14 +639,14 @@ pub mod tests {
             .init();
 
         let nacos_registry_url = 
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let registry = NacosRegistry::new(nacos_registry_url);
 
         let mut service_url = 
Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
         service_url
             .params
             .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
 
-        let ret = registry.register(service_url);
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
@@ -599,7 +654,7 @@ pub mod tests {
         thread::sleep(sleep_millis);
 
         let unregister_url = 
Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
-        let ret = registry.unregister(unregister_url);
+        let ret = registry.unregister(unregister_url).await;
 
         info!("deregister result: {:?}", ret);
 
@@ -607,20 +662,9 @@ pub mod tests {
         thread::sleep(sleep_millis);
     }
 
-    struct TestNotifyListener;
-    impl NotifyListener for TestNotifyListener {
-        fn notify(&self, event: ServiceEvent) {
-            info!("notified: {:?}", event.key);
-        }
-
-        fn notify_all(&self, event: ServiceEvent) {
-            info!("notify_all: {:?}", event.key);
-        }
-    }
-
-    #[test]
+    #[tokio::test]
     #[ignore]
-    fn test_subscribe() {
+    pub async fn test_subscribe() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -631,33 +675,36 @@ pub mod tests {
             .init();
 
         let nacos_registry_url = 
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let registry = NacosRegistry::new(nacos_registry_url);
 
         let mut service_url = 
Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
         service_url
             .params
             .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
 
-        let ret = registry.register(service_url);
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
-        let subscribe_url = 
Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
-
-        let ret = registry.subscribe(subscribe_url, 
Arc::new(TestNotifyListener));
+        let subscribe_url = 
Url::from_url("consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+        let subscribe_ret = registry.subscribe(subscribe_url).await;
 
-        if let Err(e) = ret {
+        if let Err(e) = subscribe_ret {
             error!("error message: {:?}", e);
             return;
         }
 
+        let mut rx = subscribe_ret.unwrap();
+        let change = rx.recv().await;
+        info!("receive change: {:?}", change);
+
         let sleep_millis = time::Duration::from_secs(300);
         thread::sleep(sleep_millis);
     }
 
-    #[test]
+    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
     #[ignore]
-    fn test_unsubscribe() {
+    pub async fn test_unsubscribe() {
         tracing_subscriber::fmt()
             .with_thread_names(true)
             .with_file(true)
@@ -668,33 +715,35 @@ pub mod tests {
             .init();
 
         let nacos_registry_url = 
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
-        let mut registry = NacosRegistry::new(nacos_registry_url);
+        let registry = NacosRegistry::new(nacos_registry_url);
 
         let mut service_url = 
Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider&timestamp=1670060843807").unwrap();
         service_url
             .params
             .insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
 
-        let ret = registry.register(service_url);
+        let ret = registry.register(service_url).await;
 
         info!("register result: {:?}", ret);
 
         let subscribe_url = 
Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
 
-        let listener = Arc::new(TestNotifyListener);
-
-        let ret = registry.subscribe(subscribe_url, listener.clone());
+        let ret = registry.subscribe(subscribe_url).await;
 
         if let Err(e) = ret {
             error!("error message: {:?}", e);
             return;
         }
 
+        let mut rx = ret.unwrap();
+        let change = rx.recv().await;
+        info!("receive change: {:?}", change);
+
         let sleep_millis = time::Duration::from_secs(40);
         thread::sleep(sleep_millis);
 
         let unsubscribe_url = 
Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
-        let ret = registry.unsubscribe(unsubscribe_url, listener.clone());
+        let ret = registry.unsubscribe(unsubscribe_url).await;
 
         if let Err(e) = ret {
             error!("error message: {:?}", e);
diff --git a/registry/zookeeper/Cargo.toml b/registry/zookeeper/Cargo.toml
index 2df5499..ebcb269 100644
--- a/registry/zookeeper/Cargo.toml
+++ b/registry/zookeeper/Cargo.toml
@@ -9,10 +9,13 @@ repository = "https://github.com/apache/dubbo-rust.git";
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-zookeeper = "0.7.0"
+zookeeper = "0.8.0"
 dubbo.workspace = true
+anyhow.workspace = true
 serde_json.workspace = true
 serde = { workspace = true, features = ["derive"] }
 urlencoding.workspace = true
 dubbo-logger.workspace = true
 dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index e8c2c5c..f3733d5 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -17,27 +17,20 @@
 
 #![allow(unused_variables, dead_code, missing_docs)]
 
-use std::{
-    collections::{HashMap, HashSet},
-    env,
-    sync::{Arc, Mutex, RwLock},
-    time::Duration,
-};
+use std::{collections::HashMap, env, sync::Arc, time::Duration};
 
+use async_trait::async_trait;
 use dubbo_base::{
     constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
     Url,
 };
 use dubbo_logger::tracing::{debug, error, info};
 use serde::{Deserialize, Serialize};
-#[allow(unused_imports)]
+use tokio::{select, sync::mpsc};
 use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, 
ZooKeeper};
 
 use dubbo::{
-    registry::{
-        memory_registry::MemoryRegistry, NotifyListener, Registry, 
RegistryNotifyListener,
-        ServiceEvent,
-    },
+    registry::n_registry::{DiscoverStream, Registry, ServiceChange},
     StdError,
 };
 
@@ -54,12 +47,9 @@ impl Watcher for LoggingWatcher {
     }
 }
 
-//#[derive(Debug)]
 pub struct ZookeeperRegistry {
     root_path: String,
     zk_client: Arc<ZooKeeper>,
-    listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
-    memory_registry: Arc<Mutex<MemoryRegistry>>,
 }
 
 #[derive(Serialize, Deserialize, Debug)]
@@ -91,24 +81,6 @@ impl ZookeeperRegistry {
         ZookeeperRegistry {
             root_path: "/services".to_string(),
             zk_client: Arc::new(zk_client),
-            listeners: RwLock::new(HashMap::new()),
-            memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
-        }
-    }
-
-    fn create_listener(
-        &self,
-        path: String,
-        service_name: String,
-        listener: RegistryNotifyListener,
-    ) -> ServiceInstancesChangedListener {
-        let mut service_names = HashSet::new();
-        service_names.insert(service_name.clone());
-        ServiceInstancesChangedListener {
-            zk_client: Arc::clone(&self.zk_client),
-            path,
-            service_name: service_name.clone(),
-            listener,
         }
     }
 
@@ -127,10 +99,6 @@ impl ZookeeperRegistry {
         s.to_string()
     }
 
-    pub fn get_client(&self) -> Arc<ZooKeeper> {
-        self.zk_client.clone()
-    }
-
     // If the parent node does not exist in the ZooKeeper, 
Err(ZkError::NoNode) will be returned.
     pub fn create_path(
         &self,
@@ -154,7 +122,7 @@ impl ZookeeperRegistry {
             Ok(_) => Ok(()),
             Err(err) => {
                 error!("zk path {} parent not exists.", path);
-                Err(Box::try_from(err).unwrap())
+                Err(err.into())
             }
         }
     }
@@ -176,16 +144,12 @@ impl ZookeeperRegistry {
             current.push('/');
             current.push_str(node_key);
             if !self.exists_path(current.as_str()) {
-                let new_create_mode = match children == node_key {
-                    true => create_mode,
-                    false => CreateMode::Persistent,
-                };
-                let new_data = match children == node_key {
-                    true => data,
-                    false => "",
+                let (new_create_mode, new_data) = match children == node_key {
+                    true => (create_mode, data),
+                    false => (CreateMode::Persistent, ""),
                 };
-                self.create_path(current.as_str(), new_data, new_create_mode)
-                    .unwrap();
+
+                self.create_path(current.as_str(), new_data, new_create_mode)?;
             }
         }
         Ok(())
@@ -193,7 +157,7 @@ impl ZookeeperRegistry {
 
     pub fn delete_path(&self, path: &str) {
         if self.exists_path(path) {
-            self.get_client().delete(path, None).unwrap()
+            self.zk_client.delete(path, None).unwrap()
         }
     }
 
@@ -213,6 +177,65 @@ impl ZookeeperRegistry {
             None
         }
     }
+
+    pub fn diff<'a>(
+        old_urls: &'a Vec<String>,
+        new_urls: &'a Vec<String>,
+    ) -> (Vec<String>, Vec<String>) {
+        let old_urls_map: HashMap<String, String> = old_urls
+            .iter()
+            .map(|url| dubbo_base::Url::from_url(url.as_str()))
+            .filter(|item| item.is_some())
+            .map(|item| item.unwrap())
+            .map(|item| {
+                let ip_port = item.get_ip_port();
+                let url = item.encoded_raw_url_string();
+                (ip_port, url)
+            })
+            .collect();
+
+        let new_urls_map: HashMap<String, String> = new_urls
+            .iter()
+            .map(|url| dubbo_base::Url::from_url(url.as_str()))
+            .filter(|item| item.is_some())
+            .map(|item| item.unwrap())
+            .map(|item| {
+                let ip_port = item.get_ip_port();
+                let url = item.encoded_raw_url_string();
+                (ip_port, url)
+            })
+            .collect();
+
+        let mut add_hosts = Vec::new();
+        let mut removed_hosts = Vec::new();
+
+        for (key, new_host) in new_urls_map.iter() {
+            let old_host = old_urls_map.get(key);
+            match old_host {
+                None => {
+                    add_hosts.push(new_host.clone());
+                }
+                Some(old_host) => {
+                    if !old_host.eq(new_host) {
+                        removed_hosts.push(old_host.clone());
+                        add_hosts.push(new_host.clone());
+                    }
+                }
+            }
+        }
+
+        for (key, old_host) in old_urls_map.iter() {
+            let new_host = old_urls_map.get(key);
+            match new_host {
+                None => {
+                    removed_hosts.push(old_host.clone());
+                }
+                Some(_) => {}
+            }
+        }
+
+        (removed_hosts, add_hosts)
+    }
 }
 
 impl Default for ZookeeperRegistry {
@@ -236,8 +259,9 @@ impl Default for ZookeeperRegistry {
     }
 }
 
+#[async_trait]
 impl Registry for ZookeeperRegistry {
-    fn register(&mut self, url: Url) -> Result<(), StdError> {
+    async fn register(&self, url: Url) -> Result<(), StdError> {
         debug!("register url: {}", url);
         let zk_path = format!(
             "/{}/{}/{}/{}",
@@ -250,7 +274,7 @@ impl Registry for ZookeeperRegistry {
         Ok(())
     }
 
-    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+    async fn unregister(&self, url: Url) -> Result<(), StdError> {
         let zk_path = format!(
             "/{}/{}/{}/{}",
             DUBBO_KEY,
@@ -263,194 +287,155 @@ impl Registry for ZookeeperRegistry {
     }
 
     // for consumer to find the changes of providers
-    fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), StdError> {
+    async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
         let service_name = url.get_service_name();
         let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, 
PROVIDERS_KEY);
-        if self
-            .listeners
-            .read()
-            .unwrap()
-            .get(service_name.as_str())
-            .is_some()
-        {
-            return Ok(());
-        }
 
-        self.listeners
-            .write()
-            .unwrap()
-            .insert(service_name.to_string(), listener.clone());
+        debug!("subscribe service: {}", zk_path);
 
-        let zk_listener =
-            self.create_listener(zk_path.clone(), service_name.to_string(), 
listener.clone());
+        let (listener, mut change_rx) = ZooKeeperListener::new();
+        let arc_listener = Arc::new(listener);
 
-        let zk_changed_paths = self.zk_client.get_children_w(&zk_path, 
zk_listener);
-        let result = match zk_changed_paths {
-            Err(err) => {
-                error!("zk subscribe error: {}", err);
-                Vec::new()
+        let watcher = ZooKeeperWatcher::new(arc_listener.clone(), 
zk_path.clone());
+
+        let (discover_tx, discover_rx) = mpsc::channel(64);
+
+        let zk_client_in_task = self.zk_client.clone();
+        let zk_path_in_task = zk_path.clone();
+        let service_name_in_task = service_name.clone();
+        let arc_listener_in_task = arc_listener.clone();
+        tokio::spawn(async move {
+            let zk_client = zk_client_in_task;
+            let zk_path = zk_path_in_task;
+            let service_name = service_name_in_task;
+            let listener = arc_listener_in_task;
+
+            let mut current_urls = Vec::new();
+
+            loop {
+                let changed = select! {
+                    _ = discover_tx.closed() => {
+                        info!("discover task quit, discover channel closed");
+                        None
+                    },
+                    changed = change_rx.recv() => {
+                        changed
+                    }
+                };
+
+                match changed {
+                    Some(_) => {
+                        let zookeeper_watcher =
+                            ZooKeeperWatcher::new(listener.clone(), 
zk_path.clone());
+
+                        match zk_client.get_children_w(&zk_path, 
zookeeper_watcher) {
+                            Ok(children) => {
+                                let (removed, add) =
+                                    ZookeeperRegistry::diff(&current_urls, 
&children);
+
+                                for url in removed {
+                                    match discover_tx
+                                        
.send(Ok(ServiceChange::Remove(url.clone())))
+                                        .await
+                                    {
+                                        Ok(_) => {}
+                                        Err(e) => {
+                                            error!("send service change 
failed: {:?}, maybe user unsubscribe", e);
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                for url in add {
+                                    match discover_tx
+                                        
.send(Ok(ServiceChange::Insert(url.clone(), ())))
+                                        .await
+                                    {
+                                        Ok(_) => {}
+                                        Err(e) => {
+                                            error!("send service change 
failed: {:?}, maybe user unsubscribe", e);
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                current_urls = children;
+                            }
+                            Err(err) => {
+                                error!("zk subscribe error: {}", err);
+                                break;
+                            }
+                        }
+                    }
+                    None => {
+                        error!("receive service change task quit, unsubscribe 
{}.", zk_path);
+                        break;
+                    }
+                }
             }
-            Ok(urls) => urls
-                .iter()
-                .map(|node_key| {
-                    let provider_url: Url = urlencoding::decode(node_key)
-                        .unwrap()
-                        .to_string()
-                        .as_str()
-                        .into();
-                    provider_url
-                })
-                .collect(),
-        };
-        info!("notifying {}->{:?}", service_name, result);
-        listener.notify(ServiceEvent {
-            key: service_name,
-            action: String::from("ADD"),
-            service: result,
+
+            debug!("unsubscribe service: {}", zk_path);
         });
-        Ok(())
-    }
 
-    fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) -> 
Result<(), StdError> {
-        todo!()
+        arc_listener.changed(zk_path);
+
+        Ok(discover_rx)
     }
-}
 
-pub struct ServiceInstancesChangedListener {
-    zk_client: Arc<ZooKeeper>,
-    path: String,
-    service_name: String,
-    listener: RegistryNotifyListener,
-}
+    async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+        let service_name = url.get_service_name();
+        let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, 
PROVIDERS_KEY);
 
-impl Watcher for ServiceInstancesChangedListener {
-    fn handle(&self, event: WatchedEvent) {
-        if let (WatchedEventType::NodeChildrenChanged, Some(path)) = 
(event.event_type, event.path)
-        {
-            let event_path = path.clone();
-            let dirs = self
-                .zk_client
-                .get_children(&event_path, false)
-                .expect("msg");
-            let result: Vec<Url> = dirs
-                .iter()
-                .map(|node_key| {
-                    let provider_url: Url = node_key.as_str().into();
-                    provider_url
-                })
-                .collect();
-            let res = self.zk_client.get_children_w(
-                &path,
-                ServiceInstancesChangedListener {
-                    zk_client: Arc::clone(&self.zk_client),
-                    path: path.clone(),
-                    service_name: self.service_name.clone(),
-                    listener: Arc::clone(&self.listener),
-                },
-            );
-
-            info!("notify {}->{:?}", self.service_name, result);
-            self.listener.notify(ServiceEvent {
-                key: self.service_name.clone(),
-                action: String::from("ADD"),
-                service: result,
-            });
-        }
+        info!("unsubscribe service: {}", zk_path);
+        Ok(())
     }
 }
 
-impl NotifyListener for ServiceInstancesChangedListener {
-    fn notify(&self, event: ServiceEvent) {
-        self.listener.notify(event);
+pub struct ZooKeeperListener {
+    tx: mpsc::Sender<String>,
+}
+
+impl ZooKeeperListener {
+    pub fn new() -> (ZooKeeperListener, mpsc::Receiver<String>) {
+        let (tx, rx) = mpsc::channel(64);
+        let this = ZooKeeperListener { tx };
+        (this, rx)
     }
 
-    fn notify_all(&self, event: ServiceEvent) {
-        self.listener.notify(event);
+    pub fn changed(&self, path: String) {
+        match self.tx.try_send(path) {
+            Ok(_) => {}
+            Err(err) => {
+                error!("send change list to listener occur an error: {}", err);
+                return;
+            }
+        }
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use std::sync::Arc;
-
-    use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
-
-    use crate::ZookeeperRegistry;
-
-    struct TestZkWatcher {
-        pub watcher: Arc<Option<TestZkWatcher>>,
-    }
+pub struct ZooKeeperWatcher {
+    listener: Arc<ZooKeeperListener>,
+    path: String,
+}
 
-    impl Watcher for TestZkWatcher {
-        fn handle(&self, event: WatchedEvent) {
-            println!("event: {:?}", event);
-        }
+impl ZooKeeperWatcher {
+    pub fn new(listener: Arc<ZooKeeperListener>, path: String) -> 
ZooKeeperWatcher {
+        ZooKeeperWatcher { listener, path }
     }
+}
 
-    #[test]
-    fn zk_read_write_watcher() {
-        // 
https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
-        // using ENV to set zookeeper server urls
-        let zkr = ZookeeperRegistry::default();
-        let zk_client = zkr.get_client();
-        let watcher = TestZkWatcher {
-            watcher: Arc::new(None),
-        };
-        if zk_client.exists("/test", true).is_err() {
-            zk_client
-                .create(
-                    "/test",
-                    vec![1, 3],
-                    Acl::open_unsafe().clone(),
-                    CreateMode::Ephemeral,
-                )
-                .unwrap();
-        }
-        let zk_res = zk_client.create(
-            "/test",
-            "hello".into(),
-            Acl::open_unsafe().clone(),
-            CreateMode::Ephemeral,
-        );
-        let result = zk_client.get_children_w("/test", watcher);
-        assert!(result.is_ok());
-        if zk_client.exists("/test/a", true).is_err() {
-            zk_client.delete("/test/a", None).unwrap();
-        }
-        if zk_client.exists("/test/a", true).is_err() {
-            zk_client.delete("/test/b", None).unwrap();
+impl Watcher for ZooKeeperWatcher {
+    fn handle(&self, event: WatchedEvent) {
+        info!("receive zookeeper event: {:?}", event);
+        let event_type: WatchedEventType = event.event_type;
+        match event_type {
+            WatchedEventType::None => {
+                info!("event type is none, ignore it.");
+                return;
+            }
+            _ => {}
         }
-        let zk_res = zk_client.create(
-            "/test/a",
-            "hello".into(),
-            Acl::open_unsafe().clone(),
-            CreateMode::Ephemeral,
-        );
-        let zk_res = zk_client.create(
-            "/test/b",
-            "world".into(),
-            Acl::open_unsafe().clone(),
-            CreateMode::Ephemeral,
-        );
-        let test_a_result = zk_client.get_data("/test", true);
-        assert!(test_a_result.is_ok());
-        let vec1 = test_a_result.unwrap().0;
-        // data in /test should equals to "hello"
-        assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
-        zk_client.close().unwrap()
-    }
 
-    #[test]
-    fn create_path_with_parent_check() {
-        let zkr = ZookeeperRegistry::default();
-        let path = "/du1bbo/test11111";
-        let data = "hello";
-        // creating a child on a not exists parent, throw a NoNode error.
-        // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
-        // assert!(result.is_err());
-        let create_with_parent_check_result =
-            zkr.create_path_with_parent_check(path, data, 
CreateMode::Ephemeral);
-        assert!(create_with_parent_check_result.is_ok());
-        assert_eq!(data, zkr.get_data(path, false).unwrap());
+        self.listener.changed(self.path.clone());
     }
 }


Reply via email to