This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new aeeb354 Implement service discovery (#103)
aeeb354 is described below
commit aeeb3546be6a4ba01098fb77de35f29ff3b167e0
Author: Robert LU <[email protected]>
AuthorDate: Thu Feb 9 20:10:49 2023 +0800
Implement service discovery (#103)
* implement service discovery
* update github-actions.yml
* use new pattern
* Update zookeeper_registry.rs
---
.github/workflows/github-actions.yml | 42 +--
Cargo.toml | 1 +
config/src/config.rs | 2 +-
dubbo-build/src/client.rs | 134 ++++++---
dubbo/Cargo.toml | 1 +
dubbo/src/cluster/directory.rs | 150 ++++++++++
dubbo/src/cluster/mod.rs | 18 ++
dubbo/src/codegen.rs | 6 +
dubbo/src/invocation.rs | 32 +++
dubbo/src/lib.rs | 1 +
dubbo/src/protocol/triple/triple_invoker.rs | 18 +-
dubbo/src/protocol/triple/triple_protocol.rs | 5 +-
dubbo/src/registry/memory_registry.rs | 12 +-
dubbo/src/registry/mod.rs | 25 +-
dubbo/src/registry/protocol.rs | 4 +-
dubbo/src/triple/client/builder.rs | 67 ++---
dubbo/src/triple/client/triple.rs | 127 ++++-----
examples/echo/Cargo.toml | 3 +
examples/echo/src/echo/client.rs | 9 +-
.../hello_echo.rs => echo/grpc.examples.echo.rs} | 54 ++--
examples/echo/src/protos/hello_echo.rs | 54 ++--
examples/greeter/Cargo.toml | 3 +
examples/greeter/proto/greeter.proto | 2 +-
examples/greeter/src/greeter/client.rs | 23 +-
registry-zookeeper/Cargo.toml | 14 +
registry-zookeeper/LICENSE | 202 ++++++++++++++
registry-zookeeper/src/lib.rs | 27 ++
registry-zookeeper/src/zookeeper_registry.rs | 306 +++++++++++++++++++++
28 files changed, 1095 insertions(+), 247 deletions(-)
diff --git a/.github/workflows/github-actions.yml
b/.github/workflows/github-actions.yml
index e69ee90..d6c85bb 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -23,6 +23,18 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
+ - name: Set up cargo cache
+ uses: actions/cache@v3
+ continue-on-error: false
+ with:
+ path: |
+ ~/.cargo/bin/
+ ~/.cargo/registry/index/
+ ~/.cargo/registry/cache/
+ ~/.cargo/git/db/
+ target/
+ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+ restore-keys: ${{ runner.os }}-cargo-
- name: setup protoc
run: |
mkdir $HOME/protoc/ -p &&
@@ -32,19 +44,9 @@ jobs:
unzip /tmp/protoc-21.9-linux-x86_64.zip &&
echo "$HOME/protoc/bin" >> $GITHUB_PATH
shell: bash
- - run: cargo check
-
- fmt:
- name: Rustfmt
- runs-on: ubuntu-latest
-
- steps:
- - uses: actions/checkout@main
- - uses: actions-rs/toolchain@v1
- with:
- toolchain: stable
- run: rustup component add rustfmt
- run: cargo fmt --all -- --check
+ - run: cargo check
example-greeter:
name: example/greeter
@@ -54,15 +56,6 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- - name: setup protoc
- run: |
- mkdir $HOME/protoc/ -p &&
- cd $HOME/protoc/ &&
- curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \
-
https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip
&&
- unzip /tmp/protoc-21.9-linux-x86_64.zip &&
- echo "$HOME/protoc/bin" >> $GITHUB_PATH
- shell: bash
- name: Set up cargo cache
uses: actions/cache@v3
continue-on-error: false
@@ -75,6 +68,15 @@ jobs:
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-
+ - name: setup protoc
+ run: |
+ mkdir $HOME/protoc/ -p &&
+ cd $HOME/protoc/ &&
+ curl --location --silent --output /tmp/protoc-21.9-linux-x86_64.zip \
+
https://github.com/protocolbuffers/protobuf/releases/download/v21.9/protoc-21.9-linux-x86_64.zip
&&
+ unzip /tmp/protoc-21.9-linux-x86_64.zip &&
+ echo "$HOME/protoc/bin" >> $GITHUB_PATH
+ shell: bash
- run: cargo build
working-directory: examples/greeter
- name: example greeter
diff --git a/Cargo.toml b/Cargo.toml
index 140c079..d3942a6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,6 +2,7 @@
members = [
"xds",
"registry",
+ "registry-zookeeper",
"metadata",
"common",
"config",
diff --git a/config/src/config.rs b/config/src/config.rs
index 672319a..500fd33 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -77,7 +77,7 @@ impl RootConfig {
v
}
Err(err) => {
- tracing::error!(
+ tracing::warn!(
"error loading config_path: {:?}, use default path: {:?}",
err,
DUBBO_CONFIG_PATH
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 8d58ab1..05e8e89 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -66,11 +66,11 @@ pub fn generate<T: Service>(
#service_doc
#(#struct_attributes)*
#[derive(Debug, Clone, Default)]
- pub struct #service_ident<T> {
- inner: TripleClient<T>,
+ pub struct #service_ident {
+ inner: TripleClient,
}
- impl #service_ident<ClientBoxService> {
+ impl #service_ident {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
#service_ident {
@@ -78,33 +78,27 @@ pub fn generate<T: Service>(
}
}
- pub fn build(builder: ClientBuilder) -> Self {
- Self {
- inner: TripleClient::with_builder(builder),
- }
- }
- }
+ // pub fn build(builder: ClientBuilder) -> Self {
+ // Self {
+ // inner: TripleClient::new(builder),
+ // }
+ // }
- impl<T> #service_ident<T>
- where
- T: Service<http::Request<hyperBody>, Response =
http::Response<BoxBody>>,
- T::Error: Into<StdError>,
- {
- pub fn new(inner: T, builder: ClientBuilder) -> Self {
+ pub fn new(builder: ClientBuilder) -> Self {
Self {
- inner: TripleClient::new(inner, builder),
+ inner: TripleClient::new(builder),
}
}
- pub fn with_filter<F>(self, filter: F) ->
#service_ident<FilterService<T, F>>
- where
- F: Filter,
- {
- let inner = self.inner.with_filter(filter);
- #service_ident {
- inner,
- }
- }
+ // pub fn with_filter<F>(self, filter: F) ->
#service_ident<FilterService<T, F>>
+ // where
+ // F: Filter,
+ // {
+ // let inner = self.inner.with_filter(filter);
+ // #service_ident {
+ // inner,
+ // }
+ // }
#methods
@@ -123,6 +117,12 @@ fn generate_methods<T: Service>(
let package = if emit_package { service.package() } else { "" };
for method in service.methods() {
+ let service_unique_name = format!(
+ "{}{}{}",
+ package,
+ if package.is_empty() { "" } else { "." },
+ service.identifier()
+ );
let path = format!(
"/{}{}{}/{}",
package,
@@ -134,14 +134,34 @@ fn generate_methods<T: Service>(
stream.extend(generate_doc_comments(method.comment()));
let method = match (method.client_streaming(),
method.server_streaming()) {
- (false, false) => generate_unary(&method, proto_path,
compile_well_known_types, path),
- (false, true) => {
- generate_server_streaming(&method, proto_path,
compile_well_known_types, path)
- }
- (true, false) => {
- generate_client_streaming(&method, proto_path,
compile_well_known_types, path)
- }
- (true, true) => generate_streaming(&method, proto_path,
compile_well_known_types, path),
+ (false, false) => generate_unary(
+ service_unique_name,
+ &method,
+ proto_path,
+ compile_well_known_types,
+ path,
+ ),
+ (false, true) => generate_server_streaming(
+ service_unique_name,
+ &method,
+ proto_path,
+ compile_well_known_types,
+ path,
+ ),
+ (true, false) => generate_client_streaming(
+ service_unique_name,
+ &method,
+ proto_path,
+ compile_well_known_types,
+ path,
+ ),
+ (true, true) => generate_streaming(
+ service_unique_name,
+ &method,
+ proto_path,
+ compile_well_known_types,
+ path,
+ ),
};
stream.extend(method);
@@ -151,6 +171,7 @@ fn generate_methods<T: Service>(
}
fn generate_unary<T: Method>(
+ service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
@@ -159,6 +180,7 @@ fn generate_unary<T: Method>(
let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
+ let method_name = method.identifier();
quote! {
pub async fn #ident(
@@ -166,19 +188,22 @@ fn generate_unary<T: Method>(
request: Request<#request>,
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
+ let invocation = RpcInvocation::default()
+ .with_servie_unique_name(String::from(#service_unique_name))
+ .with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
- self.inner
- .unary(
+ self.inner.unary(
request,
codec,
path,
- )
- .await
+ invocation,
+ ).await
}
}
}
fn generate_server_streaming<T: Method>(
+ service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
@@ -188,6 +213,7 @@ fn generate_server_streaming<T: Method>(
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
+ let method_name = method.identifier();
quote! {
pub async fn #ident(
@@ -196,13 +222,22 @@ fn generate_server_streaming<T: Method>(
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
+ let invocation = RpcInvocation::default()
+ .with_servie_unique_name(String::from(#service_unique_name))
+ .with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
- self.inner.server_streaming(request, codec, path).await
+ self.inner.server_streaming(
+ request,
+ codec,
+ path,
+ invocation,
+ ).await
}
}
}
fn generate_client_streaming<T: Method>(
+ service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
@@ -212,6 +247,7 @@ fn generate_client_streaming<T: Method>(
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
+ let method_name = method.identifier();
quote! {
pub async fn #ident(
@@ -219,13 +255,22 @@ fn generate_client_streaming<T: Method>(
request: impl IntoStreamingRequest<Message = #request>
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
+ let invocation = RpcInvocation::default()
+ .with_servie_unique_name(String::from(#service_unique_name))
+ .with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
- self.inner.client_streaming(request, codec, path).await
+ self.inner.client_streaming(
+ request,
+ codec,
+ path,
+ invocation,
+ ).await
}
}
}
fn generate_streaming<T: Method>(
+ service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
@@ -235,6 +280,7 @@ fn generate_streaming<T: Method>(
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
+ let method_name = method.identifier();
quote! {
pub async fn #ident(
@@ -242,8 +288,16 @@ fn generate_streaming<T: Method>(
request: impl IntoStreamingRequest<Message = #request>
) -> Result<Response<Decoding<#response>>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
+ let invocation = RpcInvocation::default()
+ .with_servie_unique_name(String::from(#service_unique_name))
+ .with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
- self.inner.bidi_streaming(request, codec, path).await
+ self.inner.bidi_streaming(
+ request,
+ codec,
+ path,
+ invocation,
+ ).await
}
}
}
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 145b4a0..6a51730 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -24,6 +24,7 @@ async-trait = "0.1.56"
tower-layer = "0.3"
bytes = "1.0"
pin-project = "1.0"
+rand = "0.8.5"
serde_json = "1.0.82"
serde = {version="1.0.138", features = ["derive"]}
futures = "0.3"
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
new file mode 100644
index 0000000..4952de7
--- /dev/null
+++ b/dubbo/src/cluster/directory.rs
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::str::FromStr;
+use std::sync::{Arc, RwLock};
+
+use crate::common::url::Url;
+use crate::invocation::{Invocation, RpcInvocation};
+use crate::registry::memory_registry::MemoryNotifyListener;
+use crate::registry::{BoxRegistry, RegistryWrapper};
+
+/// Directory.
+///
+/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
+pub trait Directory: Debug + DirectoryClone {
+ fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+}
+
+pub trait DirectoryClone {
+ fn clone_box(&self) -> Box<dyn Directory>;
+}
+
+impl<T> DirectoryClone for T
+where
+ T: 'static + Directory + Clone,
+{
+ fn clone_box(&self) -> Box<dyn Directory> {
+ Box::new(self.clone())
+ }
+}
+
+impl Clone for Box<dyn Directory> {
+ fn clone(&self) -> Box<dyn Directory> {
+ self.clone_box()
+ }
+}
+
+#[derive(Debug)]
+pub struct StaticDirectory {
+ uri: http::Uri,
+}
+
+impl StaticDirectory {
+ pub fn new(host: &str) -> StaticDirectory {
+ let uri = match http::Uri::from_str(host) {
+ Ok(v) => v,
+ Err(err) => {
+ tracing::error!("http uri parse error: {}, host: {}", err,
host);
+ panic!("http uri parse error: {}, host: {}", err, host)
+ }
+ };
+ StaticDirectory { uri: uri }
+ }
+
+ pub fn from_uri(uri: &http::Uri) -> StaticDirectory {
+ StaticDirectory { uri: uri.clone() }
+ }
+}
+
+impl Directory for StaticDirectory {
+ fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ self.uri.host().unwrap(),
+ self.uri.port().unwrap(),
+ invocation.get_target_service_unique_name(),
+ ))
+ .unwrap();
+ vec![url]
+ }
+}
+
+impl DirectoryClone for StaticDirectory {
+ fn clone_box(&self) -> Box<dyn Directory> {
+ Box::new(StaticDirectory {
+ uri: self.uri.clone(),
+ })
+ }
+}
+
+#[derive(Debug)]
+pub struct RegistryDirectory {
+ registry: RegistryWrapper,
+ service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
+
+impl RegistryDirectory {
+ pub fn new(registry: BoxRegistry) -> RegistryDirectory {
+ RegistryDirectory {
+ registry: RegistryWrapper {
+ registry: Some(registry),
+ },
+ service_instances: Arc::new(RwLock::new(HashMap::new())),
+ }
+ }
+}
+
+impl DirectoryClone for RegistryDirectory {
+ fn clone_box(&self) -> Box<dyn Directory> {
+ todo!()
+ }
+}
+
+impl Directory for RegistryDirectory {
+ fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+ let service_name = invocation.get_target_service_unique_name();
+
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ "127.0.0.1", "8888", service_name
+ ))
+ .unwrap();
+
+ self.registry
+ .registry
+ .as_ref()
+ .expect("msg")
+ .subscribe(
+ url,
+ MemoryNotifyListener {
+ service_instances: Arc::clone(&self.service_instances),
+ },
+ )
+ .expect("subscribe");
+
+ let map = self
+ .service_instances
+ .read()
+ .expect("service_instances.read");
+ let binding = Vec::new();
+ let url_vec = map.get(&service_name).unwrap_or(&binding);
+ url_vec.to_vec()
+ }
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
new file mode 100644
index 0000000..84980ea
--- /dev/null
+++ b/dubbo/src/cluster/mod.rs
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod directory;
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index ef65646..3535fbf 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -24,9 +24,15 @@ pub use http_body::Body;
pub use hyper::Body as hyperBody;
pub use tower_service::Service;
+pub use super::cluster::directory::Directory;
+pub use super::cluster::directory::RegistryDirectory;
+pub use super::invocation::RpcInvocation;
pub use super::invocation::{IntoStreamingRequest, Request, Response};
pub use super::protocol::triple::triple_invoker::TripleInvoker;
pub use super::protocol::Invoker;
+pub use super::registry::BoxRegistry;
+pub use super::registry::Registry;
+pub use super::registry::RegistryWrapper;
pub use super::triple::client::TripleClient;
pub use super::triple::codec::prost::ProstCodec;
pub use super::triple::codec::Codec;
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 5a80e9a..423e9c7 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -189,3 +189,35 @@ impl Metadata {
header
}
}
+
+pub trait Invocation {
+ fn get_target_service_unique_name(&self) -> String;
+ fn get_method_name(&self) -> String;
+}
+
+#[derive(Default)]
+pub struct RpcInvocation {
+ target_service_unique_name: String,
+ method_name: String,
+}
+
+impl RpcInvocation {
+ pub fn with_servie_unique_name(mut self, service_unique_name: String) ->
Self {
+ self.target_service_unique_name = service_unique_name;
+ self
+ }
+ pub fn with_method_name(mut self, method_name: String) -> Self {
+ self.method_name = method_name;
+ self
+ }
+}
+
+impl Invocation for RpcInvocation {
+ fn get_target_service_unique_name(&self) -> String {
+ self.target_service_unique_name.clone()
+ }
+
+ fn get_method_name(&self) -> String {
+ self.method_name.clone()
+ }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 68bbea5..bf97ef7 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+pub mod cluster;
pub mod codegen;
pub mod common;
pub mod context;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs
b/dubbo/src/protocol/triple/triple_invoker.rs
index 1cbe2ed..2c03fa7 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,13 +15,11 @@
* limitations under the License.
*/
-use std::str::FromStr;
-
use tower_service::Service;
use crate::common::url::Url;
use crate::protocol::Invoker;
-use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
+use crate::triple::client::builder::ClientBoxService;
pub struct TripleInvoker {
url: Url,
@@ -29,13 +27,13 @@ pub struct TripleInvoker {
}
impl TripleInvoker {
- pub fn new(url: Url) -> TripleInvoker {
- let uri = http::Uri::from_str(&url.to_url()).unwrap();
- Self {
- url,
- conn: ClientBuilder::from(uri).connect(),
- }
- }
+ // pub fn new(url: Url) -> TripleInvoker {
+ // let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ // Self {
+ // url,
+ // conn: ClientBuilder::from_uri(&uri).build()connect(),
+ // }
+ // }
}
impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs
b/dubbo/src/protocol/triple/triple_protocol.rs
index 037bfb7..4d56b7a 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -68,8 +68,9 @@ impl Protocol for TripleProtocol {
Box::new(TripleExporter::new())
}
- async fn refer(self, url: Url) -> Self::Invoker {
- TripleInvoker::new(url)
+ async fn refer(self, _url: Url) -> Self::Invoker {
+ todo!()
+ // TripleInvoker::new(url)
// Self::Invoker
}
}
diff --git a/dubbo/src/registry/memory_registry.rs
b/dubbo/src/registry/memory_registry.rs
index 67df5c7..4d0350e 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -20,6 +20,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
+use crate::common::url::Url;
+
use super::{NotifyListener, Registry};
// 从url中获取服务注册的元数据
@@ -100,11 +102,17 @@ impl Registry for MemoryRegistry {
}
}
-pub struct MemoryNotifyListener {}
+pub struct MemoryNotifyListener {
+ pub service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
impl NotifyListener for MemoryNotifyListener {
fn notify(&self, event: super::ServiceEvent) {
- todo!()
+ let mut map = self.service_instances.write().expect("msg");
+ match event.action.as_str() {
+ "ADD" => map.insert(event.key, event.service),
+ &_ => todo!(),
+ };
}
fn notify_all(&self, event: super::ServiceEvent) {
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index cbae16b..8c56692 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -19,6 +19,8 @@
pub mod memory_registry;
pub mod protocol;
+use std::fmt::Debug;
+
use crate::common::url::Url;
pub trait Registry {
@@ -37,10 +39,27 @@ pub trait NotifyListener {
}
pub struct ServiceEvent {
- key: String,
- action: String,
- service: Url,
+ pub key: String,
+ pub action: String,
+ pub service: Vec<Url>,
}
pub type BoxRegistry =
Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> +
Send + Sync>;
+
+#[derive(Default)]
+pub struct RegistryWrapper {
+ pub registry: Option<Box<dyn Registry<NotifyListener =
memory_registry::MemoryNotifyListener>>>,
+}
+
+impl Clone for RegistryWrapper {
+ fn clone(&self) -> Self {
+ Self { registry: None }
+ }
+}
+
+impl Debug for RegistryWrapper {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RegistryWrapper").finish()
+ }
+}
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 933975c..bf6a8a5 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -20,7 +20,6 @@ use std::sync::{Arc, RwLock};
use super::memory_registry::MemoryRegistry;
use super::BoxRegistry;
-use crate::codegen::TripleInvoker;
use crate::common::url::Url;
use crate::protocol::triple::triple_exporter::TripleExporter;
use crate::protocol::triple::triple_protocol::TripleProtocol;
@@ -104,6 +103,7 @@ impl Protocol for RegistryProtocol {
// get Registry from registry_url
// init directory based on registry_url and Registry
// init Cluster based on Directory generates Invoker
- Box::new(TripleInvoker::new(url))
+ todo!()
+ //Box::new(TripleInvoker::new(url))
}
}
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index 21afc0c..40a6ffb 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,35 +15,46 @@
* limitations under the License.
*/
-use http::Uri;
-use hyper::client::conn::Builder;
-use tokio::time::Duration;
-use tower::ServiceBuilder;
-
-use crate::triple::transport::connection::Connection;
+use crate::cluster::directory::StaticDirectory;
+use crate::codegen::Directory;
+use crate::triple::compression::CompressionEncoding;
use crate::utils::boxed::BoxService;
+use super::TripleClient;
+
pub type ClientBoxService =
BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>,
crate::Error>;
#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
- pub uri: Uri,
pub timeout: Option<u64>,
pub connector: &'static str,
+ directory: Option<Box<dyn Directory>>,
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
ClientBuilder {
- uri: Uri::builder().build().unwrap(),
timeout: None,
connector: "",
+ directory: None,
+ }
+ }
+
+ pub fn from_static(host: &str) -> ClientBuilder {
+ Self {
+ timeout: None,
+ connector: "",
+ directory: Some(Box::new(StaticDirectory::new(&host))),
}
}
- pub fn from_static(s: &'static str) -> ClientBuilder {
- Self::from(Uri::from_static(s))
+ pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
+ Self {
+ timeout: None,
+ connector: "",
+ directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
+ }
}
pub fn with_timeout(self, timeout: u64) -> Self {
@@ -53,9 +64,17 @@ impl ClientBuilder {
}
}
+ /// host: http://0.0.0.0:8888
+ pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+ Self {
+ directory: Some(directory),
+ ..self
+ }
+ }
+
pub fn with_host(self, host: &'static str) -> Self {
Self {
- uri: Uri::from_static(host),
+ directory: Some(Box::new(StaticDirectory::new(&host))),
..self
}
}
@@ -67,28 +86,10 @@ impl ClientBuilder {
}
}
- pub fn connect(self) -> ClientBoxService {
- let builder = ServiceBuilder::new();
- let timeout = self.timeout.unwrap_or(5);
- let builder = builder.timeout(Duration::from_secs(timeout));
-
- let mut b = Builder::new();
- let hyper_builder = b.http2_only(true);
- let conn = Connection::new()
- .with_host(self.uri.clone())
- .with_connector(self.connector)
- .with_builder(hyper_builder.to_owned());
-
- BoxService::new(builder.service(conn))
- }
-}
-
-impl From<Uri> for ClientBuilder {
- fn from(u: Uri) -> Self {
- Self {
- uri: u,
- timeout: None,
- connector: "tcp",
+ pub fn build(self) -> TripleClient {
+ TripleClient {
+ send_compression_encoding: Some(CompressionEncoding::Gzip),
+ directory: self.directory,
}
}
}
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index d4864ef..b9f6492 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -18,12 +18,15 @@
use std::str::FromStr;
use futures_util::{future, stream, StreamExt, TryStreamExt};
+
use http::HeaderValue;
+use rand::prelude::SliceRandom;
use tower_service::Service;
-use super::builder::{ClientBoxService, ClientBuilder};
-use crate::filter::service::FilterService;
-use crate::filter::Filter;
+use super::super::transport::connection::Connection;
+use super::builder::ClientBuilder;
+use crate::codegen::{Directory, RpcInvocation};
+
use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
use crate::triple::codec::Codec;
use crate::triple::compression::CompressionEncoding;
@@ -31,77 +34,29 @@ use crate::triple::decode::Decoding;
use crate::triple::encode::encode;
#[derive(Debug, Clone, Default)]
-pub struct TripleClient<T> {
- builder: Option<ClientBuilder>,
- inner: T,
- send_compression_encoding: Option<CompressionEncoding>,
+pub struct TripleClient {
+ pub(crate) send_compression_encoding: Option<CompressionEncoding>,
+ pub(crate) directory: Option<Box<dyn Directory>>,
}
-impl TripleClient<ClientBoxService> {
+impl TripleClient {
pub fn connect(host: String) -> Self {
- let uri = match http::Uri::from_str(&host) {
- Ok(v) => v,
- Err(err) => {
- tracing::error!("http uri parse error: {}, host: {}", err,
host);
- panic!("http uri parse error: {}, host: {}", err, host)
- }
- };
+ let builder = ClientBuilder::from_static(&host);
- let builder = ClientBuilder::from(uri);
-
- TripleClient {
- builder: Some(builder.clone()),
- inner: builder.connect(),
- send_compression_encoding: Some(CompressionEncoding::Gzip),
- }
+ builder.build()
}
- pub fn with_builder(builder: ClientBuilder) -> Self {
- TripleClient {
- builder: Some(builder.clone()),
- inner: builder.connect(),
- send_compression_encoding: Some(CompressionEncoding::Gzip),
- }
+ pub fn new(builder: ClientBuilder) -> Self {
+ builder.build()
}
-}
-impl<T> TripleClient<T> {
- pub fn new(inner: T, builder: ClientBuilder) -> Self {
- TripleClient {
- builder: Some(builder),
- inner,
- send_compression_encoding: Some(CompressionEncoding::Gzip),
- }
- }
-
- pub fn with_filter<F>(self, filter: F) -> TripleClient<FilterService<T, F>>
- where
- F: Filter,
- {
- TripleClient::new(
- FilterService::new(self.inner, filter),
- self.builder.unwrap(),
- )
- }
-}
-
-impl<T> TripleClient<T>
-where
- T: Service<http::Request<hyper::Body>, Response =
http::Response<crate::BoxBody>>,
- T::Error: Into<crate::Error>,
-{
fn map_request(
&self,
+ uri: http::Uri,
path: http::uri::PathAndQuery,
body: hyper::Body,
) -> http::Request<hyper::Body> {
- let mut parts = match self.builder.as_ref() {
- Some(v) => v.to_owned().uri.into_parts(),
- None => {
- tracing::error!("client host is empty");
- return http::Request::new(hyper::Body::empty());
- }
- };
+ let mut parts = uri.into_parts();
parts.path_and_query = Some(path);
let uri = http::Uri::from_parts(parts).unwrap();
@@ -127,7 +82,7 @@ where
);
req.headers_mut().insert(
"content-type",
- HeaderValue::from_static("application/grpc+json"),
+ HeaderValue::from_static("application/grpc+proto"),
);
req.headers_mut()
.insert("user-agent",
HeaderValue::from_static("dubbo-rust/0.1.0"));
@@ -172,6 +127,7 @@ where
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
+ invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -187,10 +143,15 @@ where
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let req = self.map_request(path, body);
+ let url_list = self.directory.as_ref().expect("msg").list(invocation);
+ let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+
+ let req = self.map_request(http_uri.clone(), path, body);
- let response = self
- .inner
+ let mut conn = Connection::new().with_host(http_uri);
+ let response = conn
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -228,6 +189,7 @@ where
req: impl IntoStreamingRequest<Message = M1>,
mut codec: C,
path: http::uri::PathAndQuery,
+ invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -243,10 +205,15 @@ where
.into_stream();
let body = hyper::Body::wrap_stream(en);
- let req = self.map_request(path, body);
+ let url_list = self.directory.as_ref().expect("msg").list(invocation);
+ let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+
+ let req = self.map_request(http_uri.clone(), path, body);
- let response = self
- .inner
+ let mut conn = Connection::new().with_host(http_uri);
+ let response = conn
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -268,6 +235,7 @@ where
req: impl IntoStreamingRequest<Message = M1>,
mut codec: C,
path: http::uri::PathAndQuery,
+ invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -283,10 +251,15 @@ where
.into_stream();
let body = hyper::Body::wrap_stream(en);
- let req = self.map_request(path, body);
+ let url_list = self.directory.as_ref().expect("msg").list(invocation);
+ let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- let response = self
- .inner
+ let req = self.map_request(http_uri.clone(), path, body);
+
+ let mut conn = Connection::new().with_host(http_uri);
+ let response = conn
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -324,6 +297,7 @@ where
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
+ invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -339,10 +313,15 @@ where
.into_stream();
let body = hyper::Body::wrap_stream(en);
- let req = self.map_request(path, body);
+ let url_list = self.directory.as_ref().expect("msg").list(invocation);
+ let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
+
+ let req = self.map_request(http_uri.clone(), path, body);
- let response = self
- .inner
+ let mut conn = Connection::new().with_host(http_uri);
+ let response = conn
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 9e794b0..f82ef80 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -23,8 +23,11 @@ prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
+hyper = { version = "0.14.19", features = ["full"]}
+
dubbo = {path = "../../dubbo", version = "0.2.0"}
dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version =
"0.2.0"}
[build-dependencies]
dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index 060a17f..7312f72 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -31,10 +31,11 @@ impl Filter for FakeFilter {
#[tokio::main]
async fn main() {
- let builder = ClientBuilder::new()
- .with_connector("unix")
- .with_host("unix://127.0.0.1:8888");
- let mut cli = EchoClient::build(builder);
+ // let builder = ClientBuilder::new()
+ // .with_connector("unix")
+ // .with_host("unix://127.0.0.1:8888");
+ let builder =
ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000);
+ let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli =
EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
let resp = cli
diff --git a/examples/echo/src/protos/hello_echo.rs
b/examples/echo/src/echo/grpc.examples.echo.rs
similarity index 90%
copy from examples/echo/src/protos/hello_echo.rs
copy to examples/echo/src/echo/grpc.examples.echo.rs
index 08a1d29..9c487e6 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/echo/grpc.examples.echo.rs
@@ -33,37 +33,19 @@ pub mod echo_client {
use dubbo::codegen::*;
/// Echo is the echo service.
#[derive(Debug, Clone, Default)]
- pub struct EchoClient<T> {
- inner: TripleClient<T>,
+ pub struct EchoClient {
+ inner: TripleClient,
}
- impl EchoClient<ClientBoxService> {
+ impl EchoClient {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
EchoClient { inner: cli }
}
- pub fn build(builder: ClientBuilder) -> Self {
+ pub fn new(builder: ClientBuilder) -> Self {
Self {
- inner: TripleClient::with_builder(builder),
+ inner: TripleClient::new(builder),
}
}
- }
- impl<T> EchoClient<T>
- where
- T: Service<http::Request<hyperBody>, Response =
http::Response<BoxBody>>,
- T::Error: Into<StdError>,
- {
- pub fn new(inner: T, builder: ClientBuilder) -> Self {
- Self {
- inner: TripleClient::new(inner, builder),
- }
- }
- pub fn with_filter<F>(self, filter: F) -> EchoClient<FilterService<T,
F>>
- where
- F: Filter,
- {
- let inner = self.inner.with_filter(filter);
- EchoClient { inner }
- }
/// UnaryEcho is unary echo.
pub async fn unary_echo(
&mut self,
@@ -71,8 +53,11 @@ pub mod echo_client {
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("UnaryEcho"));
let path =
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
- self.inner.unary(request, codec, path).await
+ self.inner.unary(request, codec, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
pub async fn server_streaming_echo(
@@ -81,10 +66,15 @@ pub mod echo_client {
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
- self.inner.server_streaming(request, codec, path).await
+ self.inner
+ .server_streaming(request, codec, path, invocation)
+ .await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
@@ -93,10 +83,15 @@ pub mod echo_client {
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
- self.inner.client_streaming(request, codec, path).await
+ self.inner
+ .client_streaming(request, codec, path, invocation)
+ .await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
@@ -105,10 +100,15 @@ pub mod echo_client {
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
- self.inner.bidi_streaming(request, codec, path).await
+ self.inner
+ .bidi_streaming(request, codec, path, invocation)
+ .await
}
}
}
diff --git a/examples/echo/src/protos/hello_echo.rs
b/examples/echo/src/protos/hello_echo.rs
index 08a1d29..9c487e6 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/protos/hello_echo.rs
@@ -33,37 +33,19 @@ pub mod echo_client {
use dubbo::codegen::*;
/// Echo is the echo service.
#[derive(Debug, Clone, Default)]
- pub struct EchoClient<T> {
- inner: TripleClient<T>,
+ pub struct EchoClient {
+ inner: TripleClient,
}
- impl EchoClient<ClientBoxService> {
+ impl EchoClient {
pub fn connect(host: String) -> Self {
let cli = TripleClient::connect(host);
EchoClient { inner: cli }
}
- pub fn build(builder: ClientBuilder) -> Self {
+ pub fn new(builder: ClientBuilder) -> Self {
Self {
- inner: TripleClient::with_builder(builder),
+ inner: TripleClient::new(builder),
}
}
- }
- impl<T> EchoClient<T>
- where
- T: Service<http::Request<hyperBody>, Response =
http::Response<BoxBody>>,
- T::Error: Into<StdError>,
- {
- pub fn new(inner: T, builder: ClientBuilder) -> Self {
- Self {
- inner: TripleClient::new(inner, builder),
- }
- }
- pub fn with_filter<F>(self, filter: F) -> EchoClient<FilterService<T,
F>>
- where
- F: Filter,
- {
- let inner = self.inner.with_filter(filter);
- EchoClient { inner }
- }
/// UnaryEcho is unary echo.
pub async fn unary_echo(
&mut self,
@@ -71,8 +53,11 @@ pub mod echo_client {
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("UnaryEcho"));
let path =
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
- self.inner.unary(request, codec, path).await
+ self.inner.unary(request, codec, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
pub async fn server_streaming_echo(
@@ -81,10 +66,15 @@ pub mod echo_client {
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
- self.inner.server_streaming(request, codec, path).await
+ self.inner
+ .server_streaming(request, codec, path, invocation)
+ .await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
@@ -93,10 +83,15 @@ pub mod echo_client {
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
- self.inner.client_streaming(request, codec, path).await
+ self.inner
+ .client_streaming(request, codec, path, invocation)
+ .await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
@@ -105,10 +100,15 @@ pub mod echo_client {
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
let codec =
dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let invocation = RpcInvocation::default()
+
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
+ .with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
- self.inner.bidi_streaming(request, codec, path).await
+ self.inner
+ .bidi_streaming(request, codec, path, invocation)
+ .await
}
}
}
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index b698bc7..c4b20cc 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -22,9 +22,12 @@ prost-derive = {version = "0.10", optional = true}
prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
+tracing = "0.1"
+tracing-subscriber = "0.2.0"
dubbo = {path = "../../dubbo", version = "0.2.0"}
dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version =
"0.2.0"}
[build-dependencies]
dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/greeter/proto/greeter.proto
b/examples/greeter/proto/greeter.proto
index 0d8be79..a0c466a 100644
--- a/examples/greeter/proto/greeter.proto
+++ b/examples/greeter/proto/greeter.proto
@@ -29,7 +29,7 @@ message GreeterReply {
string message = 1;
}
-service Greeter{
+service Greeter {
// unary
rpc greet(GreeterRequest) returns (GreeterReply);
diff --git a/examples/greeter/src/greeter/client.rs
b/examples/greeter/src/greeter/client.rs
index 2004b6f..c493d60 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -23,10 +23,31 @@ pub mod protos {
use dubbo::codegen::*;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() {
- let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string());
+ // a builder for `FmtSubscriber`.
+ let subscriber = FmtSubscriber::builder()
+ // all spans/events with a level higher than TRACE (e.g, debug, info,
warn, etc.)
+ // will be written to stdout.
+ .with_max_level(Level::INFO)
+ // completes the builder.
+ .finish();
+
+ tracing::subscriber::set_global_default(subscriber).expect("setting
default subscriber failed");
+
+ let mut cli =
GreeterClient::new(ClientBuilder::from_static(&"http://127.0.0.1:8888"));
+
+ // Here is example for zk
+ // let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+ // Ok(val) => val,
+ // Err(_) => "localhost:2181".to_string(),
+ // };
+ // let zkr = ZookeeperRegistry::new(&zk_connect_string);
+ // let directory = RegistryDirectory::new(Box::new(zkr));
+ // cli = cli.with_directory(Box::new(directory));
println!("# unary call");
let resp = cli
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
new file mode 100644
index 0000000..cf040fa
--- /dev/null
+++ b/registry-zookeeper/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "dubbo-registry-zookeeper"
+version = "0.2.0"
+edition = "2021"
+license = "Apache-2.0"
+
+# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+zookeeper = "0.6.1"
+dubbo = {path = "../dubbo/", version = "0.2.0"}
+serde_json = "1.0"
+serde = {version = "1.0.145",features = ["derive"]}
+tracing = "0.1"
diff --git a/registry-zookeeper/LICENSE b/registry-zookeeper/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/registry-zookeeper/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs
new file mode 100644
index 0000000..ccfce10
--- /dev/null
+++ b/registry-zookeeper/src/lib.rs
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod zookeeper_registry;
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn it_works() {
+ let result = 2 + 2;
+ assert_eq!(result, 4);
+ }
+}
diff --git a/registry-zookeeper/src/zookeeper_registry.rs
b/registry-zookeeper/src/zookeeper_registry.rs
new file mode 100644
index 0000000..4c82634
--- /dev/null
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#![allow(unused_variables, dead_code, missing_docs)]
+
+use dubbo::common::url::Url;
+use dubbo::registry::memory_registry::MemoryNotifyListener;
+use dubbo::registry::NotifyListener;
+use dubbo::registry::Registry;
+use dubbo::registry::ServiceEvent;
+use dubbo::StdError;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::time::Duration;
+use tracing::info;
+
+use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+// extract service registry metadata from url
+/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+/// dubboPath = fmt.Sprintf("/%s/%s/%s",
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c),
common.DubboNodes[common.PROVIDER])
+
+pub const REGISTRY_GROUP_KEY: &str = "registry.group";
+
+struct LoggingWatcher;
+impl Watcher for LoggingWatcher {
+ fn handle(&self, e: WatchedEvent) {
+ println!("{:?}", e)
+ }
+}
+
+//#[derive(Debug)]
+pub struct ZookeeperRegistry {
+ root_path: String,
+ zk_client: Arc<ZooKeeper>,
+
+ listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as
Registry>::NotifyListener>>>,
+}
+
+pub struct MyNotifyListener {}
+
+impl NotifyListener for MyNotifyListener {
+ fn notify(&self, event: dubbo::registry::ServiceEvent) {
+ todo!()
+ }
+
+ fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
+ todo!()
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ZkServiceInstance {
+ name: String,
+ address: String,
+ port: i32,
+}
+
+impl ZkServiceInstance {
+ pub fn get_service_name(&self) -> &str {
+ self.name.as_str()
+ }
+
+ pub fn get_host(&self) -> &str {
+ self.address.as_str()
+ }
+
+ pub fn get_port(&self) -> i32 {
+ self.port
+ }
+}
+
+impl ZookeeperRegistry {
+ pub fn new(connect_string: &str) -> ZookeeperRegistry {
+ let zk_client =
+ ZooKeeper::connect(connect_string, Duration::from_secs(15),
LoggingWatcher).unwrap();
+ ZookeeperRegistry {
+ root_path: "/services".to_string(),
+ zk_client: Arc::new(zk_client),
+
+ listeners: RwLock::new(HashMap::new()),
+ }
+ }
+
+ fn create_listener(
+ &self,
+ path: String,
+ service_name: String,
+ listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>,
+ ) -> ServiceInstancesChangedListener {
+ let mut service_names = HashSet::new();
+ service_names.insert(service_name.clone());
+ return ServiceInstancesChangedListener {
+ zk_client: Arc::clone(&self.zk_client),
+ path: path,
+
+ service_name: service_name.clone(),
+ listener: listener,
+ };
+ }
+
+ fn get_app_name(&self, service_name: String) -> String {
+ let res = self
+ .zk_client
+ .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
+
+ let x = res.unwrap().0;
+ let s = match std::str::from_utf8(&x) {
+ Ok(v) => v,
+ Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
+ };
+ s.to_string()
+ }
+}
+
+impl Registry for ZookeeperRegistry {
+ type NotifyListener = MemoryNotifyListener;
+
+ fn register(&mut self, url: Url) -> Result<(), StdError> {
+ todo!();
+ }
+
+ fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ todo!();
+ }
+
+ fn subscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
+ let binding = url.get_service_name();
+ let service_name = binding.get(0).unwrap();
+ let app_name = self.get_app_name(service_name.clone());
+ let path = self.root_path.clone() + "/" + &app_name;
+ if self.listeners.read().unwrap().get(service_name).is_some() {
+ return Ok(());
+ }
+
+ let arc_listener = Arc::new(listener);
+ self.listeners
+ .write()
+ .unwrap()
+ .insert(service_name.to_string(), Arc::clone(&arc_listener));
+
+ let zk_listener = self.create_listener(
+ path.clone(),
+ service_name.to_string(),
+ Arc::clone(&arc_listener),
+ );
+
+ let res = self.zk_client.get_children_w(&path, zk_listener);
+ let result: Vec<Url> = res
+ .unwrap()
+ .iter()
+ .map(|node_key| {
+ let zk_res = self.zk_client.get_data(
+ &(self.root_path.clone() + "/" + &app_name + "/" +
&node_key),
+ false,
+ );
+ let vec_u8 = zk_res.unwrap().0;
+ let sstr = std::str::from_utf8(&vec_u8).unwrap();
+ let instance: ZkServiceInstance =
serde_json::from_str(sstr).unwrap();
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ instance.get_host(),
+ instance.get_port(),
+ service_name
+ ))
+ .unwrap();
+ url
+ })
+ .collect();
+
+ info!("notifing {}->{:?}", service_name, result);
+ arc_listener.notify(ServiceEvent {
+ key: service_name.to_string(),
+ action: String::from("ADD"),
+ service: result,
+ });
+ Ok(())
+ }
+
+ fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
+ todo!()
+ }
+}
+
+pub struct ServiceInstancesChangedListener {
+ zk_client: Arc<ZooKeeper>,
+ path: String,
+
+ service_name: String,
+ listener: Arc<MemoryNotifyListener>,
+}
+
+impl Watcher for ServiceInstancesChangedListener {
+ fn handle(&self, event: WatchedEvent) {
+ if let (WatchedEventType::NodeChildrenChanged, Some(path)) =
(event.event_type, event.path)
+ {
+ let event_path = path.clone();
+ let dirs = self
+ .zk_client
+ .get_children(&event_path.clone(), false)
+ .expect("msg");
+ let result: Vec<Url> = dirs
+ .iter()
+ .map(|node_key| {
+ let zk_res = self
+ .zk_client
+ .get_data(&(event_path.clone() + "/" + node_key),
false);
+ let vec_u8 = zk_res.unwrap().0;
+ let sstr = std::str::from_utf8(&vec_u8).unwrap();
+ let instance: ZkServiceInstance =
serde_json::from_str(sstr).unwrap();
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ instance.get_host(),
+ instance.get_port(),
+ self.service_name
+ ))
+ .unwrap();
+ url
+ })
+ .collect();
+
+ let res = self.zk_client.get_children_w(
+ &path,
+ ServiceInstancesChangedListener {
+ zk_client: Arc::clone(&self.zk_client),
+ path: path.clone(),
+
+ service_name: self.service_name.clone(),
+ listener: Arc::clone(&self.listener),
+ },
+ );
+
+ info!("notifing {}->{:?}", self.service_name, result);
+ self.listener.notify(ServiceEvent {
+ key: self.service_name.clone(),
+ action: String::from("ADD"),
+ service: result,
+ });
+ }
+ }
+}
+
+impl NotifyListener for ServiceInstancesChangedListener {
+ fn notify(&self, event: ServiceEvent) {
+ todo!()
+ }
+
+ fn notify_all(&self, event: ServiceEvent) {
+ todo!()
+ }
+}
+
+#[test]
+fn it_works() {
+ let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
+ let zk_client =
+ ZooKeeper::connect(connect_string, Duration::from_secs(15),
LoggingWatcher).unwrap();
+ let watcher = Arc::new(Some(TestZkWatcher {
+ watcher: Arc::new(None),
+ }));
+ watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
+ let x = watcher.as_ref().expect("");
+ zk_client.get_children_w("/test", x);
+ zk_client.delete("/test/a", None);
+ zk_client.delete("/test/b", None);
+ let zk_res = zk_client.create(
+ "/test/a",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let zk_res = zk_client.create(
+ "/test/b",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ zk_client.close();
+}
+
+struct TestZkWatcher {
+ pub watcher: Arc<Option<TestZkWatcher>>,
+}
+
+impl Watcher for TestZkWatcher {
+ fn handle(&self, event: WatchedEvent) {
+ println!("event: {:?}", event);
+ }
+}