This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch refact/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/refact/cluster by this push:
new ab18d57 Rft: adapt nacos registry and zookeeper registry (#169)
ab18d57 is described below
commit ab18d5777ac1b84bfd513f8255893d5d9712a74f
Author: 毛文超 <[email protected]>
AuthorDate: Thu Dec 28 11:30:59 2023 +0800
Rft: adapt nacos registry and zookeeper registry (#169)
* Rft: adapt nacos registry and zookeeper registry
Close #168
* Rft: adapt static registry
* Rft: cargo fmt
---
dubbo/src/directory/mod.rs | 47 +-
dubbo/src/framework.rs | 11 +-
dubbo/src/registry/mod.rs | 79 ++--
dubbo/src/registry/n_registry.rs | 133 +++++-
dubbo/src/registry/protocol.rs | 31 +-
dubbo/src/registry/types.rs | 47 +-
dubbo/src/svc.rs | 4 -
dubbo/src/triple/client/builder.rs | 18 +-
examples/echo/src/generated/grpc.examples.echo.rs | 179 +++-----
examples/greeter/src/greeter/client.rs | 12 +-
examples/greeter/src/greeter/server.rs | 9 +-
registry/nacos/Cargo.toml | 4 +-
registry/nacos/src/lib.rs | 509 ++++++++++++----------
registry/zookeeper/Cargo.toml | 5 +-
registry/zookeeper/src/lib.rs | 413 +++++++++---------
15 files changed, 773 insertions(+), 728 deletions(-)
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
index 84900aa..f0d9ebe 100644
--- a/dubbo/src/directory/mod.rs
+++ b/dubbo/src/directory/mod.rs
@@ -31,6 +31,7 @@ use crate::{
svc::NewService,
StdError,
};
+use dubbo_base::Url;
use dubbo_logger::tracing::debug;
use futures_core::ready;
use futures_util::future;
@@ -162,7 +163,11 @@ where
let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
tokio::spawn(async move {
- let receiver = registry.subscribe(service_name).await;
+ // todo use dubbo url model generate subscribe url
+ // category:serviceInterface:version:group
+ let consumer_url = format!("consumer://{}/{}", "127.0.0.1:8888",
service_name);
+ let subscribe_url = Url::from_url(&consumer_url).unwrap();
+ let receiver = registry.subscribe(subscribe_url).await;
debug!("discover start!");
match receiver {
Err(_e) => {
@@ -217,22 +222,32 @@ where
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),
Self::Error>> {
loop {
let pin_discover = Pin::new(&mut self.discover);
- let change = ready!(pin_discover.poll_discover(cx))
- .transpose()
- .map_err(|e| e.into())?;
- match change {
- Some(Change::Remove(key)) => {
- debug!("remove key: {}", key);
- self.directory.remove(&key);
- }
- Some(Change::Insert(key, _)) => {
- debug!("insert key: {}", key);
- let invoker = self.new_invoker.new_service(key.clone());
- self.directory.insert(key, invoker);
+
+ match pin_discover.poll_discover(cx) {
+ Poll::Pending => {
+ if self.directory.is_empty() {
+ return Poll::Pending;
+ } else {
+ return Poll::Ready(Ok(()));
+ }
}
- None => {
- debug!("stream closed");
- return Poll::Ready(Ok(()));
+ Poll::Ready(change) => {
+ let change = change.transpose().map_err(|e| e.into())?;
+ match change {
+ Some(Change::Remove(key)) => {
+ debug!("remove key: {}", key);
+ self.directory.remove(&key);
+ }
+ Some(Change::Insert(key, _)) => {
+ debug!("insert key: {}", key);
+ let invoker =
self.new_invoker.new_service(key.clone());
+ self.directory.insert(key, invoker);
+ }
+ None => {
+ debug!("stream closed");
+ return Poll::Ready(Ok(()));
+ }
+ }
}
}
}
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index d595f38..2666d25 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -25,9 +25,9 @@ use std::{
use crate::{
protocol::{BoxExporter, Protocol},
registry::{
+ n_registry::{ArcRegistry, Registry},
protocol::RegistryProtocol,
types::{Registries, RegistriesOperation},
- BoxRegistry, Registry,
},
};
use dubbo_base::Url;
@@ -60,14 +60,14 @@ impl Dubbo {
self
}
- pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry)
-> Self {
+ pub fn add_registry(mut self, registry_key: &str, registry: ArcRegistry)
-> Self {
if self.registries.is_none() {
self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
}
self.registries
.as_ref()
.unwrap()
- .insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
+ .insert(registry_key.to_string(), registry);
self
}
@@ -130,12 +130,13 @@ impl Dubbo {
async_vec.push(exporter);
//TODO multiple registry
if self.registries.is_some() {
- self.registries
+ let _ = self
+ .registries
.as_ref()
.unwrap()
.default_registry()
.register(url.clone())
- .unwrap();
+ .await;
}
}
}
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index d8ff6ef..b82fda8 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -17,47 +17,46 @@
#![allow(unused_variables, dead_code, missing_docs)]
pub mod integration;
-pub mod memory_registry;
pub mod n_registry;
pub mod protocol;
pub mod types;
-use std::{
- fmt::{Debug, Formatter},
- sync::Arc,
-};
-
-use dubbo_base::Url;
-
-pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync +
'static>;
-pub trait Registry {
- fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
- fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
-
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), crate::StdError>;
- fn unsubscribe(
- &self,
- url: Url,
- listener: RegistryNotifyListener,
- ) -> Result<(), crate::StdError>;
-}
-
-pub trait NotifyListener {
- fn notify(&self, event: ServiceEvent);
- fn notify_all(&self, event: ServiceEvent);
-}
-
-#[derive(Debug)]
-pub struct ServiceEvent {
- pub key: String,
- pub action: String,
- pub service: Vec<Url>,
-}
-
-pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
-
-impl Debug for BoxRegistry {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str("BoxRegistry")
- }
-}
+// use std::{
+// fmt::{Debug, Formatter},
+// sync::Arc,
+// };
+
+// use dubbo_base::Url;
+
+// pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync +
'static>;
+// pub trait Registry {
+// fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
+// fn unregister(&mut self, url: Url) -> Result<(), crate::StdError>;
+
+// fn subscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), crate::StdError>;
+// fn unsubscribe(
+// &self,
+// url: Url,
+// listener: RegistryNotifyListener,
+// ) -> Result<(), crate::StdError>;
+// }
+
+// pub trait NotifyListener {
+// fn notify(&self, event: ServiceEvent);
+// fn notify_all(&self, event: ServiceEvent);
+// }
+
+// #[derive(Debug)]
+// pub struct ServiceEvent {
+// pub key: String,
+// pub action: String,
+// pub service: Vec<Url>,
+// }
+
+// pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
+
+// impl Debug for BoxRegistry {
+// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+// f.write_str("BoxRegistry")
+// }
+// }
diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs
index 928c9b3..abcd56b 100644
--- a/dubbo/src/registry/n_registry.rs
+++ b/dubbo/src/registry/n_registry.rs
@@ -1,13 +1,22 @@
-use std::sync::Arc;
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+};
use async_trait::async_trait;
use dubbo_base::Url;
-use tokio::sync::mpsc::{channel, Receiver};
+use thiserror::Error;
+use tokio::sync::{
+ mpsc::{self, Receiver},
+ Mutex,
+};
use tower::discover::Change;
use crate::StdError;
-type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>;
+pub type ServiceChange = Change<String, ()>;
+pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
+pub type BoxRegistry = Box<dyn Registry + Send + Sync>;
#[async_trait]
pub trait Registry {
@@ -15,8 +24,7 @@ pub trait Registry {
async fn unregister(&self, url: Url) -> Result<(), StdError>;
- // todo service_name change to url
- async fn subscribe(&self, service_name: String) -> Result<DiscoverStream,
StdError>;
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
}
@@ -27,13 +35,19 @@ pub struct ArcRegistry {
}
pub enum RegistryComponent {
- NacosRegistry,
+ NacosRegistry(ArcRegistry),
ZookeeperRegistry,
StaticRegistry(StaticRegistry),
}
+pub struct StaticServiceValues {
+ listeners: Vec<mpsc::Sender<Result<ServiceChange, StdError>>>,
+ urls: HashSet<String>,
+}
+
+#[derive(Default)]
pub struct StaticRegistry {
- urls: Vec<Url>,
+ urls: Mutex<HashMap<String, StaticServiceValues>>,
}
impl ArcRegistry {
@@ -54,8 +68,8 @@ impl Registry for ArcRegistry {
self.inner.unregister(url).await
}
- async fn subscribe(&self, service_name: String) -> Result<DiscoverStream,
StdError> {
- self.inner.subscribe(service_name).await
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ self.inner.subscribe(url).await
}
async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
@@ -73,11 +87,11 @@ impl Registry for RegistryComponent {
todo!()
}
- async fn subscribe(&self, service_name: String) -> Result<DiscoverStream,
StdError> {
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
match self {
- RegistryComponent::NacosRegistry => todo!(),
+ RegistryComponent::NacosRegistry(registry) =>
registry.subscribe(url).await,
RegistryComponent::ZookeeperRegistry => todo!(),
- RegistryComponent::StaticRegistry(registry) =>
registry.subscribe(service_name).await,
+ RegistryComponent::StaticRegistry(registry) =>
registry.subscribe(url).await,
}
}
@@ -88,31 +102,102 @@ impl Registry for RegistryComponent {
impl StaticRegistry {
pub fn new(urls: Vec<Url>) -> Self {
- Self { urls }
+ let mut map = HashMap::with_capacity(urls.len());
+
+ for url in urls {
+ let service_name = url.get_service_name();
+ let static_values = map
+ .entry(service_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+ let url = url.to_string();
+ static_values.urls.insert(url.clone());
+ }
+
+ Self {
+ urls: Mutex::new(map),
+ }
}
}
#[async_trait]
impl Registry for StaticRegistry {
async fn register(&self, url: Url) -> Result<(), StdError> {
- todo!()
+ let service_name = url.get_service_name();
+ let mut lock = self.urls.lock().await;
+
+ let static_values = lock
+ .entry(service_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+ let url = url.to_string();
+ static_values.urls.insert(url.clone());
+
+ static_values.listeners.retain(|listener| {
+ let ret = listener.try_send(Ok(ServiceChange::Insert(url.clone(),
())));
+ ret.is_ok()
+ });
+
+ Ok(())
}
async fn unregister(&self, url: Url) -> Result<(), StdError> {
- todo!()
- }
-
- async fn subscribe(&self, service_name: String) -> Result<DiscoverStream,
StdError> {
- let (tx, rx) = channel(self.urls.len());
- for url in self.urls.iter() {
- let change = Ok(Change::Insert(url.to_url(), ()));
- tx.send(change).await?;
+ let service_name = url.get_service_name();
+ let mut lock = self.urls.lock().await;
+
+ match lock.get_mut(&service_name) {
+ None => Ok(()),
+ Some(static_values) => {
+ let url = url.to_string();
+ static_values.urls.remove(&url);
+ static_values.listeners.retain(|listener| {
+ let ret =
listener.try_send(Ok(ServiceChange::Remove(url.clone())));
+ ret.is_ok()
+ });
+ if static_values.urls.is_empty() {
+ lock.remove(&service_name);
+ }
+ Ok(())
+ }
}
+ }
- Ok(rx)
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
+ let service_name = url.get_service_name();
+
+ let change_rx = {
+ let mut lock = self.urls.lock().await;
+ let static_values = lock
+ .entry(service_name)
+ .or_insert_with(|| StaticServiceValues {
+ listeners: Vec::new(),
+ urls: HashSet::new(),
+ });
+
+ let (tx, change_rx) = mpsc::channel(64);
+ static_values.listeners.push(tx);
+
+ for url in static_values.urls.iter() {
+ static_values.listeners.retain(|listener| {
+ let ret =
listener.try_send(Ok(ServiceChange::Insert(url.clone(), ())));
+ ret.is_ok()
+ });
+ }
+ change_rx
+ };
+
+ Ok(change_rx)
}
async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
- todo!()
+ Ok(())
}
}
+
+#[derive(Error, Debug)]
+#[error("static registry error: {0}")]
+struct StaticRegistryError(String);
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 1250950..b9ba722 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -19,11 +19,10 @@ use dubbo_base::Url;
use dubbo_logger::tracing;
use std::{
collections::HashMap,
- fmt::{Debug, Formatter},
- sync::{Arc, Mutex, RwLock},
+ sync::{Arc, RwLock},
};
-use super::{memory_registry::MemoryRegistry, BoxRegistry};
+use super::n_registry::{ArcRegistry, Registry, StaticRegistry};
use crate::{
protocol::{
triple::{triple_exporter::TripleExporter,
triple_protocol::TripleProtocol},
@@ -42,19 +41,6 @@ pub struct RegistryProtocol {
services: HashMap<String, Vec<Url>>,
}
-impl Debug for RegistryProtocol {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.write_str(
- format!(
- "RegistryProtocol services{:#?},registries,{:#?}",
- self.services,
- self.registries.clone()
- )
- .as_str(),
- )
- }
-}
-
impl RegistryProtocol {
pub fn new() -> Self {
RegistryProtocol {
@@ -74,16 +60,17 @@ impl RegistryProtocol {
self
}
- pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
- let mem = MemoryRegistry::default();
+ pub fn get_registry(&mut self, url: Url) -> ArcRegistry {
+ let mem = StaticRegistry::default();
+ let mem = ArcRegistry::new(mem);
self.registries
.as_ref()
.unwrap()
.lock()
.unwrap()
- .insert(url.location, Arc::new(Mutex::new(Box::new(mem.clone()))));
+ .insert(url.location, mem.clone());
- Box::new(mem)
+ mem
}
}
@@ -105,8 +92,8 @@ impl Protocol for RegistryProtocol {
if let Some(urls) = registry_url {
for url in urls.clone().iter() {
if !url.service_key.is_empty() {
- let mut reg = self.get_registry(url.clone());
- reg.register(url.clone()).unwrap();
+ let reg = self.get_registry(url.clone());
+ let _ = reg.register(url.clone()).await;
}
}
}
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
index ae7c7ca..5c1687d 100644
--- a/dubbo/src/registry/types.rs
+++ b/dubbo/src/registry/types.rs
@@ -20,30 +20,22 @@ use std::{
sync::{Arc, Mutex},
};
-use dubbo_base::Url;
-use dubbo_logger::tracing::info;
use itertools::Itertools;
-use crate::{
- registry::{BoxRegistry, Registry},
- StdError,
-};
-
-use super::RegistryNotifyListener;
+use super::n_registry::ArcRegistry;
-pub type SafeRegistry = Arc<Mutex<BoxRegistry>>;
-pub type Registries = Arc<Mutex<HashMap<String, SafeRegistry>>>;
+pub type Registries = Arc<Mutex<HashMap<String, ArcRegistry>>>;
pub const DEFAULT_REGISTRY_KEY: &str = "default";
pub trait RegistriesOperation {
- fn get(&self, registry_key: &str) -> SafeRegistry;
- fn insert(&self, registry_key: String, registry: SafeRegistry);
- fn default_registry(&self) -> SafeRegistry;
+ fn get(&self, registry_key: &str) -> ArcRegistry;
+ fn insert(&self, registry_key: String, registry: ArcRegistry);
+ fn default_registry(&self) -> ArcRegistry;
}
impl RegistriesOperation for Registries {
- fn get(&self, registry_key: &str) -> SafeRegistry {
+ fn get(&self, registry_key: &str) -> ArcRegistry {
self.as_ref()
.lock()
.unwrap()
@@ -52,11 +44,11 @@ impl RegistriesOperation for Registries {
.clone()
}
- fn insert(&self, registry_key: String, registry: SafeRegistry) {
+ fn insert(&self, registry_key: String, registry: ArcRegistry) {
self.as_ref().lock().unwrap().insert(registry_key, registry);
}
- fn default_registry(&self) -> SafeRegistry {
+ fn default_registry(&self) -> ArcRegistry {
let guard = self.as_ref().lock().unwrap();
let (_, result) = guard
.iter()
@@ -66,26 +58,3 @@ impl RegistriesOperation for Registries {
result.clone()
}
}
-
-impl Registry for SafeRegistry {
- fn register(&mut self, url: Url) -> Result<(), StdError> {
- info!("register {}.", url);
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-
- fn unregister(&mut self, url: Url) -> Result<(), StdError> {
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), StdError> {
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-
- fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), StdError> {
- self.lock().unwrap().register(url).expect("registry err.");
- Ok(())
- }
-}
diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs
index df4c071..db59b92 100644
--- a/dubbo/src/svc.rs
+++ b/dubbo/src/svc.rs
@@ -1,9 +1,5 @@
use std::{marker::PhantomData, sync::Arc};
-
-
-
-
pub trait NewService<T> {
type Service;
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index c4e6e62..0dadbcc 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -59,9 +59,10 @@ impl ClientBuilder {
Self {
timeout: None,
connector: "",
- registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(
- StaticRegistry::new(vec![Url::from_url(host).unwrap()]),
- ))),
+ registry:
Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url(
+ host,
+ )
+ .unwrap()]))),
direct: true,
host: host.to_string(),
}
@@ -74,18 +75,19 @@ impl ClientBuilder {
}
}
- pub fn with_registry(self, registry: RegistryComponent) -> Self {
+ pub fn with_registry(self, registry: ArcRegistry) -> Self {
Self {
- registry: Some(ArcRegistry::new(registry)),
+ registry: Some(registry),
..self
}
}
pub fn with_host(self, host: &'static str) -> Self {
Self {
- registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(
- StaticRegistry::new(vec![Url::from_url(host).unwrap()]),
- ))),
+ registry:
Some(ArcRegistry::new(StaticRegistry::new(vec![Url::from_url(
+ host,
+ )
+ .unwrap()]))),
..self
}
}
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs
b/examples/echo/src/generated/grpc.examples.echo.rs
index 07c58fe..4521c9f 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -36,16 +36,12 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
- let path = http::uri::PathAndQuery::from_static(
- "/grpc.examples.echo.Echo/UnaryEcho",
- );
+ let path =
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
self.inner.unary(request, codec, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
@@ -53,51 +49,51 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
- self.inner.server_streaming(request, codec, path, invocation).await
+ self.inner
+ .server_streaming(request, codec, path, invocation)
+ .await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
- self.inner.client_streaming(request, codec, path, invocation).await
+ self.inner
+ .client_streaming(request, codec, path, invocation)
+ .await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
- self.inner.bidi_streaming(request, codec, path, invocation).await
+ self.inner
+ .bidi_streaming(request, codec, path, invocation)
+ .await
}
}
}
@@ -114,9 +110,7 @@ pub mod echo_server {
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the ServerStreamingEcho method.
- type ServerStreamingEchoStream: futures_util::Stream<
- Item = Result<super::EchoResponse, dubbo::status::Status>,
- >
+ type ServerStreamingEchoStream: futures_util::Stream<Item =
Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// ServerStreamingEcho is server side streaming.
@@ -130,19 +124,14 @@ pub mod echo_server {
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the BidirectionalStreamingEcho
method.
- type BidirectionalStreamingEchoStream: futures_util::Stream<
- Item = Result<super::EchoResponse, dubbo::status::Status>,
- >
+ type BidirectionalStreamingEchoStream: futures_util::Stream<Item =
Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// BidirectionalStreamingEcho is bidi streaming.
async fn bidirectional_streaming_echo(
&self,
request: Request<Decoding<super::EchoRequest>>,
- ) -> Result<
- Response<Self::BidirectionalStreamingEchoStream>,
- dubbo::status::Status,
- >;
+ ) -> Result<Response<Self::BidirectionalStreamingEchoStream>,
dubbo::status::Status>;
}
/// Echo is the echo service.
#[derive(Debug)]
@@ -172,10 +161,7 @@ pub mod echo_server {
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
- fn poll_ready(
- &mut self,
- _cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(),
Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -188,26 +174,18 @@ pub mod echo_server {
}
impl<T: Echo> UnarySvc<super::EchoRequest> for
UnaryEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<
- Response<Self::Response>,
- dubbo::status::Status,
- >;
- fn call(
- &mut self,
- request: Request<super::EchoRequest>,
- ) -> Self::Future {
+ type Future = BoxFuture<Response<Self::Response>,
dubbo::status::Status>;
+ fn call(&mut self, request:
Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move {
inner.unary_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server.unary(UnaryEchoServer { inner },
req).await;
Ok(res)
};
@@ -218,32 +196,22 @@ pub mod echo_server {
struct ServerStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
- for ServerStreamingEchoServer<T> {
+ impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for
ServerStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::ServerStreamingEchoStream;
- type Future = BoxFuture<
- Response<Self::ResponseStream>,
- dubbo::status::Status,
- >;
- fn call(
- &mut self,
- request: Request<super::EchoRequest>,
- ) -> Self::Future {
+ type Future =
+ BoxFuture<Response<Self::ResponseStream>,
dubbo::status::Status>;
+ fn call(&mut self, request:
Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.server_streaming_echo(request).await
- };
+ let fut = async move {
inner.server_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
.server_streaming(ServerStreamingEchoServer {
inner }, req)
.await;
@@ -256,31 +224,23 @@ pub mod echo_server {
struct ClientStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
- for ClientStreamingEchoServer<T> {
+ impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for
ClientStreamingEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<
- Response<Self::Response>,
- dubbo::status::Status,
- >;
+ type Future = BoxFuture<Response<Self::Response>,
dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.client_streaming_echo(request).await
- };
+ let fut = async move {
inner.client_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
.client_streaming(ClientStreamingEchoServer {
inner }, req)
.await;
@@ -293,56 +253,41 @@ pub mod echo_server {
struct BidirectionalStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> StreamingSvc<super::EchoRequest>
- for BidirectionalStreamingEchoServer<T> {
+ impl<T: Echo> StreamingSvc<super::EchoRequest> for
BidirectionalStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream =
T::BidirectionalStreamingEchoStream;
- type Future = BoxFuture<
- Response<Self::ResponseStream>,
- dubbo::status::Status,
- >;
+ type Future =
+ BoxFuture<Response<Self::ResponseStream>,
dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
-
inner.bidirectional_streaming_echo(request).await
- };
+ let fut =
+ async move {
inner.bidirectional_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
- .bidi_streaming(
- BidirectionalStreamingEchoServer {
- inner,
- },
- req,
- )
+ .bidi_streaming(BidirectionalStreamingEchoServer {
inner }, req)
.await;
Ok(res)
};
Box::pin(fut)
}
- _ => {
- Box::pin(async move {
- Ok(
- http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap(),
- )
- })
- }
+ _ => Box::pin(async move {
+ Ok(http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap())
+ }),
}
}
}
diff --git a/examples/greeter/src/greeter/client.rs
b/examples/greeter/src/greeter/client.rs
index afa1b20..cb92ed0 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -21,17 +21,21 @@ pub mod protos {
}
use std::env;
-
-use dubbo::codegen::*;
+use dubbo::{codegen::*, registry::n_registry::ArcRegistry};
+
+use dubbo_base::Url;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
+use registry_nacos::NacosRegistry;
#[tokio::main]
async fn main() {
dubbo_logger::init();
-
- let builder = ClientBuilder::new().with_host("http://127.0.0.1:8888");
+
+ let builder =
ClientBuilder::new().with_registry(ArcRegistry::new(NacosRegistry::new(
+ Url::from_url("nacos://127.0.0.1:8848").unwrap(),
+ )));
let mut cli = GreeterClient::new(builder);
diff --git a/examples/greeter/src/greeter/server.rs
b/examples/greeter/src/greeter/server.rs
index c3bc4c5..fd436e5 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -18,11 +18,13 @@
use std::{io::ErrorKind, pin::Pin};
use async_trait::async_trait;
+use dubbo_base::Url;
use futures_util::{Stream, StreamExt};
+use registry_nacos::NacosRegistry;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use dubbo::{codegen::*, Dubbo, registry::memory_registry::MemoryRegistry};
+use dubbo::{codegen::*, registry::n_registry::ArcRegistry, Dubbo};
use dubbo_config::RootConfig;
use dubbo_logger::{
tracing::{info, span},
@@ -56,10 +58,11 @@ async fn main() {
Ok(config) => config,
Err(_err) => panic!("err: {:?}", _err), // response was droped
};
+
+ let nacos_registry =
NacosRegistry::new(Url::from_url("nacos://127.0.0.1:8848").unwrap());
let mut f = Dubbo::new()
.with_config(r)
- .add_registry("memory_registry", Box::new(MemoryRegistry::new()));
-
+ .add_registry("nacos-registry", ArcRegistry::new(nacos_registry));
f.start().await;
}
diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml
index 798253a..1fa6c19 100644
--- a/registry/nacos/Cargo.toml
+++ b/registry/nacos/Cargo.toml
@@ -9,13 +9,15 @@ repository = "https://github.com/apache/dubbo-rust.git"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http"] }
+nacos-sdk = { version = "0.3", features = ["naming", "auth-by-http", "async"] }
dubbo.workspace = true
serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
anyhow.workspace = true
dubbo-logger.workspace = true
dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
[dev-dependencies]
tracing-subscriber = "0.3.16"
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index bbbaaa2..ad34237 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -16,16 +16,20 @@
*/
mod utils;
+use async_trait::async_trait;
use dubbo_base::Url;
-use std::{
- collections::{HashMap, HashSet},
- sync::{Arc, Mutex},
-};
+use std::{collections::HashMap, sync::Arc};
+use tokio::{select, sync::mpsc};
use anyhow::anyhow;
-use dubbo::registry::{NotifyListener, Registry, RegistryNotifyListener,
ServiceEvent};
-use dubbo_logger::tracing::{error, info, warn};
-use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder,
ServiceInstance};
+use dubbo::{
+ registry::n_registry::{DiscoverStream, Registry, ServiceChange},
+ StdError,
+};
+use dubbo_logger::tracing::{debug, error, info};
+use nacos_sdk::api::naming::{
+ NamingEventListener, NamingService, NamingServiceBuilder, ServiceInstance,
+};
use crate::utils::{build_nacos_client_props, is_concrete_str, is_wildcard_str,
match_range};
@@ -60,7 +64,6 @@ const INNERCLASS_COMPATIBLE_SYMBOL: &str = "___";
pub struct NacosRegistry {
nacos_naming_service: Arc<dyn NamingService + Sync + Send + 'static>,
- listeners: Mutex<HashMap<String, HashSet<Arc<NotifyListenerWrapper>>>>,
}
impl NacosRegistry {
@@ -77,49 +80,6 @@ impl NacosRegistry {
Self {
nacos_naming_service: Arc::new(nacos_naming_service),
- listeners: Mutex::new(HashMap::new()),
- }
- }
-
- #[allow(dead_code)]
- fn get_subscribe_service_names(&self, service_name: &NacosServiceName) ->
HashSet<String> {
- if service_name.is_concrete() {
- let mut set = HashSet::new();
- let service_subscribe_name = service_name.to_subscriber_str();
- let service_subscriber_legacy_name =
service_name.to_subscriber_legacy_string();
- if service_subscribe_name.eq(&service_subscriber_legacy_name) {
- set.insert(service_subscribe_name);
- } else {
- set.insert(service_subscribe_name);
- set.insert(service_subscriber_legacy_name);
- }
-
- set
- } else {
- let list_view = self.nacos_naming_service.get_service_list(
- 1,
- i32::MAX,
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- );
- if let Err(e) = list_view {
- error!("list service instances occur an error: {:?}", e);
- return HashSet::default();
- }
-
- let list_view = list_view.unwrap();
- let set: HashSet<String> = list_view
- .0
- .into_iter()
- .filter(|service_name|
service_name.split(SERVICE_NAME_SEPARATOR).count() == 4)
- .map(|service_name|
NacosServiceName::from_service_name_str(&service_name))
- .filter(|other_service_name|
service_name.is_compatible(other_service_name))
- .map(|service_name| service_name.to_subscriber_str())
- .collect();
- set
}
}
}
@@ -135,20 +95,66 @@ impl NacosRegistry {
..Default::default()
}
}
+
+ fn diff<'a>(
+ old_service: &'a Vec<ServiceInstance>,
+ new_services: &'a Vec<ServiceInstance>,
+ ) -> (Vec<&'a ServiceInstance>, Vec<&'a ServiceInstance>) {
+ let new_hosts_map: HashMap<String, &ServiceInstance> = new_services
+ .iter()
+ .map(|hosts| (hosts.ip_and_port(), hosts))
+ .collect();
+
+ let old_hosts_map: HashMap<String, &ServiceInstance> = old_service
+ .iter()
+ .map(|hosts| (hosts.ip_and_port(), hosts))
+ .collect();
+
+ let mut add_hosts = Vec::<&ServiceInstance>::new();
+ let mut removed_hosts = Vec::<&ServiceInstance>::new();
+
+ for (key, new_host) in new_hosts_map.iter() {
+ let old_host = old_hosts_map.get(key);
+ match old_host {
+ None => {
+ add_hosts.push(*new_host);
+ }
+ Some(old_host) => {
+ if !old_host.is_same_instance(new_host) {
+ removed_hosts.push(*old_host);
+ add_hosts.push(*new_host);
+ }
+ }
+ }
+ }
+
+ for (key, old_host) in old_hosts_map.iter() {
+ let new_host = new_hosts_map.get(key);
+ match new_host {
+ None => {
+ removed_hosts.push(*old_host);
+ }
+ Some(_) => {}
+ }
+ }
+
+ (removed_hosts, add_hosts)
+ }
}
+#[async_trait]
impl Registry for NacosRegistry {
- fn register(&mut self, url: Url) -> Result<(), dubbo::StdError> {
- let side = url.get_param(SIDE_KEY).unwrap_or_default();
- let register_consumer = url
- .get_param(REGISTER_CONSUMER_URL_KEY)
- .unwrap_or_else(|| false.to_string())
- .parse::<bool>()
- .unwrap_or(false);
- if side.ne(PROVIDER_SIDE) && !register_consumer {
- warn!("Please set
'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url
registration.");
- return Ok(());
- }
+ async fn register(&self, url: Url) -> Result<(), dubbo::StdError> {
+ // let side = url.get_param(SIDE_KEY).unwrap_or_default();
+ // let register_consumer = url
+ // .get_param(REGISTER_CONSUMER_URL_KEY)
+ // .unwrap_or_else(|| false.to_string())
+ // .parse::<bool>()
+ // .unwrap_or(false);
+ // if side.ne(PROVIDER_SIDE) && !register_consumer {
+ // warn!("Please set
'dubbo.registry.parameters.register-consumer-url=true' to turn on consumer url
registration.");
+ // return Ok(());
+ // }
let nacos_service_name = NacosServiceName::new(&url);
@@ -162,11 +168,10 @@ impl Registry for NacosRegistry {
let nacos_service_instance = Self::create_nacos_service_instance(url);
info!("register service: {}", nacos_service_name);
- let ret = self.nacos_naming_service.register_instance(
- nacos_service_name,
- group_name,
- nacos_service_instance,
- );
+ let ret = self
+ .nacos_naming_service
+ .register_instance(nacos_service_name, group_name,
nacos_service_instance)
+ .await;
if let Err(e) = ret {
error!("register to nacos occur an error: {:?}", e);
return Err(anyhow!("register to nacos occur an error: {:?}",
e).into());
@@ -175,7 +180,7 @@ impl Registry for NacosRegistry {
Ok(())
}
- fn unregister(&mut self, url: Url) -> Result<(), dubbo::StdError> {
+ async fn unregister(&self, url: Url) -> Result<(), dubbo::StdError> {
let nacos_service_name = NacosServiceName::new(&url);
let group_name = Some(
@@ -189,11 +194,10 @@ impl Registry for NacosRegistry {
info!("deregister service: {}", nacos_service_name);
- let ret = self.nacos_naming_service.deregister_instance(
- nacos_service_name,
- group_name,
- nacos_service_instance,
- );
+ let ret = self
+ .nacos_naming_service
+ .deregister_instance(nacos_service_name, group_name,
nacos_service_instance)
+ .await;
if let Err(e) = ret {
error!("deregister service from nacos occur an error: {:?}", e);
return Err(anyhow!("deregister service from nacos occur an error:
{:?}", e).into());
@@ -201,101 +205,161 @@ impl Registry for NacosRegistry {
Ok(())
}
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), dubbo::StdError> {
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
let service_name = NacosServiceName::new(&url);
- let url_str = url.to_url();
-
- info!("subscribe: {}", &url_str);
+ let service_group = service_name
+ .get_group_with_default(DEFAULT_GROUP)
+ .to_string();
+ let subscriber_url = service_name.to_subscriber_str();
+ info!("subscribe: {}", subscriber_url);
+
+ let (listener, mut change_receiver) = ServiceChangeListener::new();
+ let arc_listener = Arc::new(listener);
+
+ let (discover_tx, discover_rx) = mpsc::channel(64);
+
+ let nacos_naming_service = self.nacos_naming_service.clone();
+
+ let listener_in_task = arc_listener.clone();
+ let service_group_in_task = service_group.clone();
+ let subscriber_url_in_task = subscriber_url.clone();
+ tokio::spawn(async move {
+ let listener = listener_in_task;
+ let service_group = service_group_in_task;
+ let subscriber_url = subscriber_url_in_task;
+
+ let mut current_instances = Vec::new();
+ loop {
+ let change = select! {
+ _ = discover_tx.closed() => {
+ debug!("service {} change task quit, unsubscribe.",
subscriber_url);
+ None
+ },
+ change = change_receiver.recv() => change
+ };
+
+ match change {
+ Some(instances) => {
+ debug!("service {} changed", subscriber_url);
+ let (remove_instances, add_instances) =
+ NacosRegistry::diff(¤t_instances,
&instances);
+
+ for instance in remove_instances {
+ let service_name = instance.service_name.as_ref();
+ let url = match service_name {
+ None => {
+ format!("triple://{}:{}", instance.ip(),
instance.port())
+ }
+ Some(service_name) => {
+ format!(
+ "triple://{}:{}/{}",
+ instance.ip(),
+ instance.port(),
+ service_name
+ )
+ }
+ };
+
+ match
discover_tx.send(Ok(ServiceChange::Remove(url))).await {
+ Ok(_) => {}
+ Err(e) => {
+ error!(
+ "send service change failed: {:?},
maybe user unsubscribe",
+ e
+ );
+ break;
+ }
+ }
+ }
+
+ for instance in add_instances {
+ let service_name = instance.service_name.as_ref();
+ let url = match service_name {
+ None => {
+ format!("triple://{}:{}", instance.ip(),
instance.port())
+ }
+ Some(service_name) => {
+ format!(
+ "triple://{}:{}/{}",
+ instance.ip(),
+ instance.port(),
+ service_name
+ )
+ }
+ };
+
+ match
discover_tx.send(Ok(ServiceChange::Insert(url, ()))).await {
+ Ok(_) => {}
+ Err(e) => {
+ error!(
+ "send service change failed: {:?},
maybe user unsubscribe",
+ e
+ );
+ break;
+ }
+ }
+ }
+ current_instances = instances;
+ }
+ None => {
+ error!(
+ "receive service change task quit, unsubscribe
{}.",
+ subscriber_url
+ );
+ break;
+ }
+ }
+ }
- let nacos_listener: Arc<NotifyListenerWrapper> = {
- let listeners = self.listeners.lock();
- if let Err(e) = listeners {
+ debug!("unsubscribe service: {}", subscriber_url);
+ // unsubscribe
+ let unsubscribe = nacos_naming_service
+ .unsubscribe(subscriber_url, Some(service_group), Vec::new(),
listener)
+ .await;
+
+ match unsubscribe {
+ Ok(_) => {}
+ Err(e) => {
+ error!("unsubscribe service failed: {:?}", e);
+ }
+ }
+ });
+
+ let all_instance = self
+ .nacos_naming_service
+ .get_all_instances(
+ subscriber_url.clone(),
+ Some(service_group.clone()),
+ Vec::new(),
+ false,
+ )
+ .await?;
+ let _ = arc_listener.changed(all_instance);
+
+ match self
+ .nacos_naming_service
+ .subscribe(
+ subscriber_url.clone(),
+ Some(service_group.clone()),
+ Vec::new(),
+ arc_listener,
+ )
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
error!("subscribe service failed: {:?}", e);
return Err(anyhow!("subscribe service failed: {:?}",
e).into());
}
-
- let mut listeners = listeners.unwrap();
- let listener_set = listeners.get_mut(url_str.as_str());
-
- let wrapper = Arc::new(NotifyListenerWrapper(listener));
- if let Some(listener_set) = listener_set {
- listener_set.insert(wrapper.clone());
- } else {
- let mut hash_set = HashSet::new();
- hash_set.insert(wrapper.clone());
- listeners.insert(url_str, hash_set);
- }
-
- wrapper
- };
-
- let ret = self.nacos_naming_service.subscribe(
- service_name.to_subscriber_str(),
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- Vec::new(),
- nacos_listener,
- );
-
- if let Err(e) = ret {
- error!("subscribe service failed: {:?}", e);
- return Err(anyhow!("subscribe service failed: {:?}", e).into());
}
- Ok(())
+ Ok(discover_rx)
}
- fn unsubscribe(
- &self,
- url: Url,
- listener: RegistryNotifyListener,
- ) -> Result<(), dubbo::StdError> {
+ async fn unsubscribe(&self, url: Url) -> Result<(), dubbo::StdError> {
let service_name = NacosServiceName::new(&url);
- let url_str = url.to_url();
- info!("unsubscribe: {}", &url_str);
-
- let nacos_listener: Arc<NotifyListenerWrapper> = {
- let listeners = self.listeners.lock();
- if let Err(e) = listeners {
- error!("unsubscribe service failed: {:?}", e);
- return Err(anyhow!("unsubscribe service failed: {:?}",
e).into());
- }
-
- let mut listeners = listeners.unwrap();
- let listener_set = listeners.get_mut(url_str.as_str());
- if listener_set.is_none() {
- return Ok(());
- }
-
- let listener_set = listener_set.unwrap();
-
- let listener = Arc::new(NotifyListenerWrapper(listener));
- let listener = listener_set.take(&listener);
- if listener.is_none() {
- return Ok(());
- }
-
- listener.unwrap()
- };
-
- let ret = self.nacos_naming_service.unsubscribe(
- service_name.to_subscriber_str(),
- Some(
- service_name
- .get_group_with_default(DEFAULT_GROUP)
- .to_string(),
- ),
- Vec::new(),
- nacos_listener,
- );
-
- if let Err(e) = ret {
- error!("unsubscribe service failed: {:?}", e);
- return Err(anyhow!("unsubscribe service failed: {:?}", e).into());
- }
+ let subscriber_url = service_name.to_subscriber_str();
+ info!("unsubscribe: {}", &subscriber_url);
Ok(())
}
@@ -484,52 +548,43 @@ impl NacosServiceName {
}
}
-struct NotifyListenerWrapper(Arc<dyn NotifyListener + Sync + Send + 'static>);
-
-impl std::hash::Hash for NotifyListenerWrapper {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- let ptr = self.0.as_ref();
- std::ptr::hash(ptr, state);
- }
+struct ServiceChangeListener {
+ tx: mpsc::Sender<Vec<ServiceInstance>>,
}
-impl PartialEq for NotifyListenerWrapper {
- fn eq(&self, other: &Self) -> bool {
- let self_ptr = self.0.as_ref() as *const dyn NotifyListener;
- let other_ptr = other.0.as_ref() as *const dyn NotifyListener;
+impl ServiceChangeListener {
+ pub fn new() -> (Self, mpsc::Receiver<Vec<ServiceInstance>>) {
+ let (tx, rx) = mpsc::channel(64);
+ let this = Self { tx };
- let (self_data_ptr, _): (*const u8, *const u8) = unsafe {
std::mem::transmute(self_ptr) };
+ (this, rx)
+ }
- let (other_data_ptr, _): (*const u8, *const u8) = unsafe {
std::mem::transmute(other_ptr) };
- self_data_ptr == other_data_ptr
+ pub fn changed(&self, instances: Vec<ServiceInstance>) -> Result<(),
dubbo::StdError> {
+ match self.tx.try_send(instances) {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ error!("send service change failed: {:?}", e);
+ Err(anyhow!("send service change failed: {:?}", e).into())
+ }
+ }
}
}
-impl Eq for NotifyListenerWrapper {}
-
-impl nacos_sdk::api::naming::NamingEventListener for NotifyListenerWrapper {
+impl NamingEventListener for ServiceChangeListener {
fn event(&self, event: Arc<nacos_sdk::api::naming::NamingChangeEvent>) {
- let service_name = event.service_name.clone();
+ debug!("service change {}", event.service_name.clone());
+ debug!("nacos event: {:?}", event);
+
let instances = event.instances.as_ref();
- let urls: Vec<Url>;
- if let Some(instances) = instances {
- urls = instances
- .iter()
- .filter_map(|data| {
- let url_str =
- format!("triple://{}:{}/{}", data.ip(), data.port(),
service_name);
- Url::from_url(&url_str)
- })
- .collect();
- } else {
- urls = Vec::new();
+ match instances {
+ None => {
+ let _ = self.changed(Vec::default());
+ }
+ Some(instances) => {
+ let _ = self.changed(instances.clone());
+ }
}
- let notify_event = ServiceEvent {
- key: service_name,
- action: String::from("CHANGE"),
- service: urls,
- };
- self.0.notify(notify_event);
}
}
@@ -543,9 +598,9 @@ pub mod tests {
use super::*;
- #[test]
+ #[tokio::test]
#[ignore]
- pub fn test_register_to_nacos() {
+ pub async fn test_register_to_nacos() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -556,14 +611,14 @@ pub mod tests {
.init();
let nacos_registry_url =
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let registry = NacosRegistry::new(nacos_registry_url);
let mut service_url =
Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
service_url
.params
.insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
- let ret = registry.register(service_url);
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
@@ -571,9 +626,9 @@ pub mod tests {
thread::sleep(sleep_millis);
}
- #[test]
+ #[tokio::test]
#[ignore]
- pub fn test_register_and_unregister() {
+ pub async fn test_register_and_unregister() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -584,14 +639,14 @@ pub mod tests {
.init();
let nacos_registry_url =
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let registry = NacosRegistry::new(nacos_registry_url);
let mut service_url =
Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
service_url
.params
.insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
- let ret = registry.register(service_url);
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
@@ -599,7 +654,7 @@ pub mod tests {
thread::sleep(sleep_millis);
let unregister_url =
Url::from_url("tri://127.0.0.1:9090/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
- let ret = registry.unregister(unregister_url);
+ let ret = registry.unregister(unregister_url).await;
info!("deregister result: {:?}", ret);
@@ -607,20 +662,9 @@ pub mod tests {
thread::sleep(sleep_millis);
}
- struct TestNotifyListener;
- impl NotifyListener for TestNotifyListener {
- fn notify(&self, event: ServiceEvent) {
- info!("notified: {:?}", event.key);
- }
-
- fn notify_all(&self, event: ServiceEvent) {
- info!("notify_all: {:?}", event.key);
- }
- }
-
- #[test]
+ #[tokio::test]
#[ignore]
- fn test_subscribe() {
+ pub async fn test_subscribe() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -631,33 +675,36 @@ pub mod tests {
.init();
let nacos_registry_url =
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let registry = NacosRegistry::new(nacos_registry_url);
let mut service_url =
Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
service_url
.params
.insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
- let ret = registry.register(service_url);
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
- let subscribe_url =
Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
-
- let ret = registry.subscribe(subscribe_url,
Arc::new(TestNotifyListener));
+ let subscribe_url =
Url::from_url("consumer://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
+ let subscribe_ret = registry.subscribe(subscribe_url).await;
- if let Err(e) = ret {
+ if let Err(e) = subscribe_ret {
error!("error message: {:?}", e);
return;
}
+ let mut rx = subscribe_ret.unwrap();
+ let change = rx.recv().await;
+ info!("receive change: {:?}", change);
+
let sleep_millis = time::Duration::from_secs(300);
thread::sleep(sleep_millis);
}
- #[test]
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
- fn test_unsubscribe() {
+ pub async fn test_unsubscribe() {
tracing_subscriber::fmt()
.with_thread_names(true)
.with_file(true)
@@ -668,33 +715,35 @@ pub mod tests {
.init();
let nacos_registry_url =
Url::from_url("nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=7015").unwrap();
- let mut registry = NacosRegistry::new(nacos_registry_url);
+ let registry = NacosRegistry::new(nacos_registry_url);
let mut service_url =
Url::from_url("tri://127.0.0.1:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&methods=sayHello,sayHelloAsync&pid=7015&service-name-mapping=true&side=provider×tamp=1670060843807").unwrap();
service_url
.params
.insert(SIDE_KEY.to_owned(), PROVIDER_SIDE.to_owned());
- let ret = registry.register(service_url);
+ let ret = registry.register(service_url).await;
info!("register result: {:?}", ret);
let subscribe_url =
Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
- let listener = Arc::new(TestNotifyListener);
-
- let ret = registry.subscribe(subscribe_url, listener.clone());
+ let ret = registry.subscribe(subscribe_url).await;
if let Err(e) = ret {
error!("error message: {:?}", e);
return;
}
+ let mut rx = ret.unwrap();
+ let change = rx.recv().await;
+ info!("receive change: {:?}", change);
+
let sleep_millis = time::Duration::from_secs(40);
thread::sleep(sleep_millis);
let unsubscribe_url =
Url::from_url("provider://192.168.0.102:50052/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&background=false&bind.ip=192.168.0.102&bind.port=50052&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&ipv6=fd00:6cb1:58a2:8ddf:0:0:0:1000&methods=sayHello,sayHelloAsync&pid=44270&service-name-mapping=true&side=provider").unwrap();
- let ret = registry.unsubscribe(unsubscribe_url, listener.clone());
+ let ret = registry.unsubscribe(unsubscribe_url).await;
if let Err(e) = ret {
error!("error message: {:?}", e);
diff --git a/registry/zookeeper/Cargo.toml b/registry/zookeeper/Cargo.toml
index 2df5499..ebcb269 100644
--- a/registry/zookeeper/Cargo.toml
+++ b/registry/zookeeper/Cargo.toml
@@ -9,10 +9,13 @@ repository = "https://github.com/apache/dubbo-rust.git"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-zookeeper = "0.7.0"
+zookeeper = "0.8.0"
dubbo.workspace = true
+anyhow.workspace = true
serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
urlencoding.workspace = true
dubbo-logger.workspace = true
dubbo-base.workspace = true
+tokio.workspace = true
+async-trait.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index e8c2c5c..f3733d5 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -17,27 +17,20 @@
#![allow(unused_variables, dead_code, missing_docs)]
-use std::{
- collections::{HashMap, HashSet},
- env,
- sync::{Arc, Mutex, RwLock},
- time::Duration,
-};
+use std::{collections::HashMap, env, sync::Arc, time::Duration};
+use async_trait::async_trait;
use dubbo_base::{
constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
Url,
};
use dubbo_logger::tracing::{debug, error, info};
use serde::{Deserialize, Serialize};
-#[allow(unused_imports)]
+use tokio::{select, sync::mpsc};
use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher,
ZooKeeper};
use dubbo::{
- registry::{
- memory_registry::MemoryRegistry, NotifyListener, Registry,
RegistryNotifyListener,
- ServiceEvent,
- },
+ registry::n_registry::{DiscoverStream, Registry, ServiceChange},
StdError,
};
@@ -54,12 +47,9 @@ impl Watcher for LoggingWatcher {
}
}
-//#[derive(Debug)]
pub struct ZookeeperRegistry {
root_path: String,
zk_client: Arc<ZooKeeper>,
- listeners: RwLock<HashMap<String, RegistryNotifyListener>>,
- memory_registry: Arc<Mutex<MemoryRegistry>>,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -91,24 +81,6 @@ impl ZookeeperRegistry {
ZookeeperRegistry {
root_path: "/services".to_string(),
zk_client: Arc::new(zk_client),
- listeners: RwLock::new(HashMap::new()),
- memory_registry: Arc::new(Mutex::new(MemoryRegistry::default())),
- }
- }
-
- fn create_listener(
- &self,
- path: String,
- service_name: String,
- listener: RegistryNotifyListener,
- ) -> ServiceInstancesChangedListener {
- let mut service_names = HashSet::new();
- service_names.insert(service_name.clone());
- ServiceInstancesChangedListener {
- zk_client: Arc::clone(&self.zk_client),
- path,
- service_name: service_name.clone(),
- listener,
}
}
@@ -127,10 +99,6 @@ impl ZookeeperRegistry {
s.to_string()
}
- pub fn get_client(&self) -> Arc<ZooKeeper> {
- self.zk_client.clone()
- }
-
// If the parent node does not exist in the ZooKeeper,
Err(ZkError::NoNode) will be returned.
pub fn create_path(
&self,
@@ -154,7 +122,7 @@ impl ZookeeperRegistry {
Ok(_) => Ok(()),
Err(err) => {
error!("zk path {} parent not exists.", path);
- Err(Box::try_from(err).unwrap())
+ Err(err.into())
}
}
}
@@ -176,16 +144,12 @@ impl ZookeeperRegistry {
current.push('/');
current.push_str(node_key);
if !self.exists_path(current.as_str()) {
- let new_create_mode = match children == node_key {
- true => create_mode,
- false => CreateMode::Persistent,
- };
- let new_data = match children == node_key {
- true => data,
- false => "",
+ let (new_create_mode, new_data) = match children == node_key {
+ true => (create_mode, data),
+ false => (CreateMode::Persistent, ""),
};
- self.create_path(current.as_str(), new_data, new_create_mode)
- .unwrap();
+
+ self.create_path(current.as_str(), new_data, new_create_mode)?;
}
}
Ok(())
@@ -193,7 +157,7 @@ impl ZookeeperRegistry {
pub fn delete_path(&self, path: &str) {
if self.exists_path(path) {
- self.get_client().delete(path, None).unwrap()
+ self.zk_client.delete(path, None).unwrap()
}
}
@@ -213,6 +177,65 @@ impl ZookeeperRegistry {
None
}
}
+
+ pub fn diff<'a>(
+ old_urls: &'a Vec<String>,
+ new_urls: &'a Vec<String>,
+ ) -> (Vec<String>, Vec<String>) {
+ let old_urls_map: HashMap<String, String> = old_urls
+ .iter()
+ .map(|url| dubbo_base::Url::from_url(url.as_str()))
+ .filter(|item| item.is_some())
+ .map(|item| item.unwrap())
+ .map(|item| {
+ let ip_port = item.get_ip_port();
+ let url = item.encoded_raw_url_string();
+ (ip_port, url)
+ })
+ .collect();
+
+ let new_urls_map: HashMap<String, String> = new_urls
+ .iter()
+ .map(|url| dubbo_base::Url::from_url(url.as_str()))
+ .filter(|item| item.is_some())
+ .map(|item| item.unwrap())
+ .map(|item| {
+ let ip_port = item.get_ip_port();
+ let url = item.encoded_raw_url_string();
+ (ip_port, url)
+ })
+ .collect();
+
+ let mut add_hosts = Vec::new();
+ let mut removed_hosts = Vec::new();
+
+ for (key, new_host) in new_urls_map.iter() {
+ let old_host = old_urls_map.get(key);
+ match old_host {
+ None => {
+ add_hosts.push(new_host.clone());
+ }
+ Some(old_host) => {
+ if !old_host.eq(new_host) {
+ removed_hosts.push(old_host.clone());
+ add_hosts.push(new_host.clone());
+ }
+ }
+ }
+ }
+
+ for (key, old_host) in old_urls_map.iter() {
+ let new_host = old_urls_map.get(key);
+ match new_host {
+ None => {
+ removed_hosts.push(old_host.clone());
+ }
+ Some(_) => {}
+ }
+ }
+
+ (removed_hosts, add_hosts)
+ }
}
impl Default for ZookeeperRegistry {
@@ -236,8 +259,9 @@ impl Default for ZookeeperRegistry {
}
}
+#[async_trait]
impl Registry for ZookeeperRegistry {
- fn register(&mut self, url: Url) -> Result<(), StdError> {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
debug!("register url: {}", url);
let zk_path = format!(
"/{}/{}/{}/{}",
@@ -250,7 +274,7 @@ impl Registry for ZookeeperRegistry {
Ok(())
}
- fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
let zk_path = format!(
"/{}/{}/{}/{}",
DUBBO_KEY,
@@ -263,194 +287,155 @@ impl Registry for ZookeeperRegistry {
}
// for consumer to find the changes of providers
- fn subscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), StdError> {
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
let service_name = url.get_service_name();
let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name,
PROVIDERS_KEY);
- if self
- .listeners
- .read()
- .unwrap()
- .get(service_name.as_str())
- .is_some()
- {
- return Ok(());
- }
- self.listeners
- .write()
- .unwrap()
- .insert(service_name.to_string(), listener.clone());
+ debug!("subscribe service: {}", zk_path);
- let zk_listener =
- self.create_listener(zk_path.clone(), service_name.to_string(),
listener.clone());
+ let (listener, mut change_rx) = ZooKeeperListener::new();
+ let arc_listener = Arc::new(listener);
- let zk_changed_paths = self.zk_client.get_children_w(&zk_path,
zk_listener);
- let result = match zk_changed_paths {
- Err(err) => {
- error!("zk subscribe error: {}", err);
- Vec::new()
+ let watcher = ZooKeeperWatcher::new(arc_listener.clone(),
zk_path.clone());
+
+ let (discover_tx, discover_rx) = mpsc::channel(64);
+
+ let zk_client_in_task = self.zk_client.clone();
+ let zk_path_in_task = zk_path.clone();
+ let service_name_in_task = service_name.clone();
+ let arc_listener_in_task = arc_listener.clone();
+ tokio::spawn(async move {
+ let zk_client = zk_client_in_task;
+ let zk_path = zk_path_in_task;
+ let service_name = service_name_in_task;
+ let listener = arc_listener_in_task;
+
+ let mut current_urls = Vec::new();
+
+ loop {
+ let changed = select! {
+ _ = discover_tx.closed() => {
+ info!("discover task quit, discover channel closed");
+ None
+ },
+ changed = change_rx.recv() => {
+ changed
+ }
+ };
+
+ match changed {
+ Some(_) => {
+ let zookeeper_watcher =
+ ZooKeeperWatcher::new(listener.clone(),
zk_path.clone());
+
+ match zk_client.get_children_w(&zk_path,
zookeeper_watcher) {
+ Ok(children) => {
+ let (removed, add) =
+ ZookeeperRegistry::diff(¤t_urls,
&children);
+
+ for url in removed {
+ match discover_tx
+
.send(Ok(ServiceChange::Remove(url.clone())))
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("send service change
failed: {:?}, maybe user unsubscribe", e);
+ break;
+ }
+ }
+ }
+
+ for url in add {
+ match discover_tx
+
.send(Ok(ServiceChange::Insert(url.clone(), ())))
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("send service change
failed: {:?}, maybe user unsubscribe", e);
+ break;
+ }
+ }
+ }
+
+ current_urls = children;
+ }
+ Err(err) => {
+ error!("zk subscribe error: {}", err);
+ break;
+ }
+ }
+ }
+ None => {
+ error!("receive service change task quit, unsubscribe
{}.", zk_path);
+ break;
+ }
+ }
}
- Ok(urls) => urls
- .iter()
- .map(|node_key| {
- let provider_url: Url = urlencoding::decode(node_key)
- .unwrap()
- .to_string()
- .as_str()
- .into();
- provider_url
- })
- .collect(),
- };
- info!("notifying {}->{:?}", service_name, result);
- listener.notify(ServiceEvent {
- key: service_name,
- action: String::from("ADD"),
- service: result,
+
+ debug!("unsubscribe service: {}", zk_path);
});
- Ok(())
- }
- fn unsubscribe(&self, url: Url, listener: RegistryNotifyListener) ->
Result<(), StdError> {
- todo!()
+ arc_listener.changed(zk_path);
+
+ Ok(discover_rx)
}
-}
-pub struct ServiceInstancesChangedListener {
- zk_client: Arc<ZooKeeper>,
- path: String,
- service_name: String,
- listener: RegistryNotifyListener,
-}
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+ let service_name = url.get_service_name();
+ let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name,
PROVIDERS_KEY);
-impl Watcher for ServiceInstancesChangedListener {
- fn handle(&self, event: WatchedEvent) {
- if let (WatchedEventType::NodeChildrenChanged, Some(path)) =
(event.event_type, event.path)
- {
- let event_path = path.clone();
- let dirs = self
- .zk_client
- .get_children(&event_path, false)
- .expect("msg");
- let result: Vec<Url> = dirs
- .iter()
- .map(|node_key| {
- let provider_url: Url = node_key.as_str().into();
- provider_url
- })
- .collect();
- let res = self.zk_client.get_children_w(
- &path,
- ServiceInstancesChangedListener {
- zk_client: Arc::clone(&self.zk_client),
- path: path.clone(),
- service_name: self.service_name.clone(),
- listener: Arc::clone(&self.listener),
- },
- );
-
- info!("notify {}->{:?}", self.service_name, result);
- self.listener.notify(ServiceEvent {
- key: self.service_name.clone(),
- action: String::from("ADD"),
- service: result,
- });
- }
+ info!("unsubscribe service: {}", zk_path);
+ Ok(())
}
}
-impl NotifyListener for ServiceInstancesChangedListener {
- fn notify(&self, event: ServiceEvent) {
- self.listener.notify(event);
+pub struct ZooKeeperListener {
+ tx: mpsc::Sender<String>,
+}
+
+impl ZooKeeperListener {
+ pub fn new() -> (ZooKeeperListener, mpsc::Receiver<String>) {
+ let (tx, rx) = mpsc::channel(64);
+ let this = ZooKeeperListener { tx };
+ (this, rx)
}
- fn notify_all(&self, event: ServiceEvent) {
- self.listener.notify(event);
+ pub fn changed(&self, path: String) {
+ match self.tx.try_send(path) {
+ Ok(_) => {}
+ Err(err) => {
+ error!("send change list to listener occur an error: {}", err);
+ return;
+ }
+ }
}
}
-#[cfg(test)]
-mod tests {
- use std::sync::Arc;
-
- use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
-
- use crate::ZookeeperRegistry;
-
- struct TestZkWatcher {
- pub watcher: Arc<Option<TestZkWatcher>>,
- }
+pub struct ZooKeeperWatcher {
+ listener: Arc<ZooKeeperListener>,
+ path: String,
+}
- impl Watcher for TestZkWatcher {
- fn handle(&self, event: WatchedEvent) {
- println!("event: {:?}", event);
- }
+impl ZooKeeperWatcher {
+ pub fn new(listener: Arc<ZooKeeperListener>, path: String) ->
ZooKeeperWatcher {
+ ZooKeeperWatcher { listener, path }
}
+}
- #[test]
- fn zk_read_write_watcher() {
- //
https://github.com/bonifaido/rust-zookeeper/blob/master/examples/zookeeper_example.rs
- // using ENV to set zookeeper server urls
- let zkr = ZookeeperRegistry::default();
- let zk_client = zkr.get_client();
- let watcher = TestZkWatcher {
- watcher: Arc::new(None),
- };
- if zk_client.exists("/test", true).is_err() {
- zk_client
- .create(
- "/test",
- vec![1, 3],
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- )
- .unwrap();
- }
- let zk_res = zk_client.create(
- "/test",
- "hello".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let result = zk_client.get_children_w("/test", watcher);
- assert!(result.is_ok());
- if zk_client.exists("/test/a", true).is_err() {
- zk_client.delete("/test/a", None).unwrap();
- }
- if zk_client.exists("/test/a", true).is_err() {
- zk_client.delete("/test/b", None).unwrap();
+impl Watcher for ZooKeeperWatcher {
+ fn handle(&self, event: WatchedEvent) {
+ info!("receive zookeeper event: {:?}", event);
+ let event_type: WatchedEventType = event.event_type;
+ match event_type {
+ WatchedEventType::None => {
+ info!("event type is none, ignore it.");
+ return;
+ }
+ _ => {}
}
- let zk_res = zk_client.create(
- "/test/a",
- "hello".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let zk_res = zk_client.create(
- "/test/b",
- "world".into(),
- Acl::open_unsafe().clone(),
- CreateMode::Ephemeral,
- );
- let test_a_result = zk_client.get_data("/test", true);
- assert!(test_a_result.is_ok());
- let vec1 = test_a_result.unwrap().0;
- // data in /test should equals to "hello"
- assert_eq!(String::from_utf8(vec1).unwrap(), "hello");
- zk_client.close().unwrap()
- }
- #[test]
- fn create_path_with_parent_check() {
- let zkr = ZookeeperRegistry::default();
- let path = "/du1bbo/test11111";
- let data = "hello";
- // creating a child on a not exists parent, throw a NoNode error.
- // let result = zkr.create_path(path, data, CreateMode::Ephemeral);
- // assert!(result.is_err());
- let create_with_parent_check_result =
- zkr.create_path_with_parent_check(path, data,
CreateMode::Ephemeral);
- assert!(create_with_parent_check_result.is_ok());
- assert_eq!(data, zkr.get_data(path, false).unwrap());
+ self.listener.changed(self.path.clone());
}
}