This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 59e2ad6 use nightly version rust to check, format. (#111)
59e2ad6 is described below
commit 59e2ad6d6269cb8b9a4af2c5163017891a2de288
Author: Robert LU <[email protected]>
AuthorDate: Tue Feb 14 18:41:02 2023 +0800
use nightly version rust to check, format. (#111)
* fix generated file duplication
For dir `examples/echo`,
* `src/echo/` is source dir.
* `src/generated/` is generated source dir.
* `proto/` is `*.proto` dir.
* fix rustfmt error and license check
* rustfmt
* use nightly version to check
* format
* generated
* .licenserc.yaml
---
.github/workflows/github-actions.yml | 3 +-
.licenserc.yaml | 2 +
.rustfmt.toml | 4 +
Cargo.toml | 2 +-
config/src/config.rs | 4 +-
dubbo-build/src/prost.rs | 10 +-
dubbo/src/cluster/directory.rs | 21 ++-
dubbo/src/codegen.rs | 52 +++---
dubbo/src/context.rs | 8 +-
dubbo/src/filter/service.rs | 3 +-
dubbo/src/framework.rs | 14 +-
dubbo/src/lib.rs | 3 +-
dubbo/src/protocol/mod.rs | 6 +-
dubbo/src/protocol/triple/mod.rs | 6 +-
dubbo/src/protocol/triple/triple_invoker.rs | 4 +-
dubbo/src/protocol/triple/triple_protocol.rs | 15 +-
dubbo/src/registry/memory_registry.rs | 7 +-
dubbo/src/registry/protocol.rs | 22 +--
dubbo/src/status.rs | 3 +-
dubbo/src/triple/client/builder.rs | 8 +-
dubbo/src/triple/client/triple.rs | 12 +-
dubbo/src/triple/codec/buffer.rs | 3 +-
dubbo/src/triple/compression.rs | 6 +-
dubbo/src/triple/decode.rs | 9 +-
dubbo/src/triple/server/builder.rs | 6 +-
dubbo/src/triple/server/service.rs | 6 +-
dubbo/src/triple/server/triple.rs | 18 +-
dubbo/src/triple/transport/connection.rs | 6 +-
.../triple/transport/connector/http_connector.rs | 9 +-
.../triple/transport/connector/unix_connector.rs | 9 +-
.../src/triple/transport/listener/tcp_listener.rs | 3 +-
dubbo/src/triple/transport/resolver/dns.rs | 13 +-
dubbo/src/triple/transport/resolver/mod.rs | 6 +-
dubbo/src/triple/transport/router.rs | 9 +-
dubbo/src/triple/transport/service.rs | 3 +-
dubbo/src/utils/boxed.rs | 2 +-
examples/echo/src/echo/client.rs | 3 +-
examples/echo/src/echo/server.rs | 9 +-
examples/echo/src/generated/grpc.examples.echo.rs | 200 ++++++++++++---------
registry-zookeeper/src/zookeeper_registry.rs | 21 ++-
40 files changed, 303 insertions(+), 247 deletions(-)
diff --git a/.github/workflows/github-actions.yml
b/.github/workflows/github-actions.yml
index d6c85bb..9e53db5 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -22,7 +22,8 @@ jobs:
- uses: actions/checkout@main
- uses: actions-rs/toolchain@v1
with:
- toolchain: stable
+ toolchain: nightly
+ default: true
- name: Set up cargo cache
uses: actions/cache@v3
continue-on-error: false
diff --git a/.licenserc.yaml b/.licenserc.yaml
index b4be62a..b4c646c 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -56,12 +56,14 @@ header: # `header` section is configurations for source
codes license header.
paths-ignore: # `paths-ignore` are the path list that will be ignored by
license-eye.
- '**/*.md'
- '**/Cargo.toml'
+ - '.rustfmt.toml'
- 'LICENSE'
- 'NOTICE'
- '.asf.yml'
- '.gitignore'
- '.github'
- "**/*.yaml"
+ - "**/generated/**"
comment: on-failure # on what condition license-eye will comment on the pull
request, `on-failure`, `always`, `never`.
# license-location-threshold specifies the index threshold where the license
header can be located,
diff --git a/.rustfmt.toml b/.rustfmt.toml
new file mode 100644
index 0000000..3a5aab0
--- /dev/null
+++ b/.rustfmt.toml
@@ -0,0 +1,4 @@
+unstable_features = true
+format_generated_files = false
+imports_granularity = "Crate"
+reorder_imports = true
diff --git a/Cargo.toml b/Cargo.toml
index d3942a6..45c4492 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,5 +9,5 @@ members = [
"dubbo",
"examples/echo",
"examples/greeter",
- "dubbo-build"
+ "dubbo-build",
]
diff --git a/config/src/config.rs b/config/src/config.rs
index 500fd33..88bf0de 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -20,9 +20,7 @@ use std::{collections::HashMap, env, fs};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
-use super::protocol::ProtocolConfig;
-use super::provider::ProviderConfig;
-use super::service::ServiceConfig;
+use super::{protocol::ProtocolConfig, provider::ProviderConfig,
service::ServiceConfig};
pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
diff --git a/dubbo-build/src/prost.rs b/dubbo-build/src/prost.rs
index f4dc588..3918fd3 100644
--- a/dubbo-build/src/prost.rs
+++ b/dubbo-build/src/prost.rs
@@ -20,9 +20,9 @@ use prost_build::{Config, Method, ServiceGenerator};
use quote::ToTokens;
use std::path::{Path, PathBuf};
-use crate::client;
-use crate::server;
-use crate::Attributes;
+use crate::{client, server, Attributes};
+
+const PACKAGE_HEADER: &str = "// @generated by apache/dubbo-rust.\n\n";
/// Simple `.proto` compiling. Use [`configure`] instead if you need more
options.
///
@@ -184,6 +184,10 @@ impl ServiceGenerator for SvcGenerator {
self.servers = TokenStream::default();
}
}
+
+ fn finalize_package(&mut self, _package: &str, buf: &mut String) {
+ buf.insert_str(0, PACKAGE_HEADER);
+ }
}
pub struct DubboService {
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index 4952de7..d1b5373 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -15,15 +15,18 @@
* limitations under the License.
*/
-use std::collections::HashMap;
-use std::fmt::Debug;
-use std::str::FromStr;
-use std::sync::{Arc, RwLock};
-
-use crate::common::url::Url;
-use crate::invocation::{Invocation, RpcInvocation};
-use crate::registry::memory_registry::MemoryNotifyListener;
-use crate::registry::{BoxRegistry, RegistryWrapper};
+use std::{
+ collections::HashMap,
+ fmt::Debug,
+ str::FromStr,
+ sync::{Arc, RwLock},
+};
+
+use crate::{
+ common::url::Url,
+ invocation::{Invocation, RpcInvocation},
+ registry::{memory_registry::MemoryNotifyListener, BoxRegistry,
RegistryWrapper},
+};
/// Directory.
///
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 3535fbf..5d0a273 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-pub use std::sync::Arc;
-pub use std::task::{Context, Poll};
+pub use std::{
+ sync::Arc,
+ task::{Context, Poll},
+};
pub use async_trait::async_trait;
pub use bytes::Bytes;
@@ -24,26 +26,28 @@ pub use http_body::Body;
pub use hyper::Body as hyperBody;
pub use tower_service::Service;
-pub use super::cluster::directory::Directory;
-pub use super::cluster::directory::RegistryDirectory;
-pub use super::invocation::RpcInvocation;
-pub use super::invocation::{IntoStreamingRequest, Request, Response};
-pub use super::protocol::triple::triple_invoker::TripleInvoker;
-pub use super::protocol::Invoker;
-pub use super::registry::BoxRegistry;
-pub use super::registry::Registry;
-pub use super::registry::RegistryWrapper;
-pub use super::triple::client::TripleClient;
-pub use super::triple::codec::prost::ProstCodec;
-pub use super::triple::codec::Codec;
-pub use super::triple::decode::Decoding;
-pub use super::triple::server::service::{
- ClientStreamingSvc, ServerStreamingSvc, StreamingSvc, UnarySvc,
+pub use super::{
+ cluster::directory::{Directory, RegistryDirectory},
+ empty_body,
+ invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
+ protocol::{triple::triple_invoker::TripleInvoker, Invoker},
+ registry::{BoxRegistry, Registry, RegistryWrapper},
+ triple::{
+ client::TripleClient,
+ codec::{prost::ProstCodec, Codec},
+ decode::Decoding,
+ server::{
+ service::{ClientStreamingSvc, ServerStreamingSvc, StreamingSvc,
UnarySvc},
+ TripleServer,
+ },
+ },
+ BoxBody, BoxFuture, StdError,
+};
+pub use crate::{
+ filter::{service::FilterService, Filter},
+ triple::{
+ client::builder::{ClientBoxService, ClientBuilder},
+ server::builder::ServerBuilder,
+ transport::connection::Connection,
+ },
};
-pub use super::triple::server::TripleServer;
-pub use super::{empty_body, BoxBody, BoxFuture, StdError};
-pub use crate::filter::service::FilterService;
-pub use crate::filter::Filter;
-pub use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
-pub use crate::triple::server::builder::ServerBuilder;
-pub use crate::triple::transport::connection::Connection;
diff --git a/dubbo/src/context.rs b/dubbo/src/context.rs
index c438bcd..6def1bf 100644
--- a/dubbo/src/context.rs
+++ b/dubbo/src/context.rs
@@ -15,9 +15,11 @@
* limitations under the License.
*/
-use std::collections::HashMap;
-use std::sync::{Arc, Mutex};
-use std::thread;
+use std::{
+ collections::HashMap,
+ sync::{Arc, Mutex},
+ thread,
+};
use serde_json::Value;
use state::Container;
diff --git a/dubbo/src/filter/service.rs b/dubbo/src/filter/service.rs
index 6ca3428..1e0c982 100644
--- a/dubbo/src/filter/service.rs
+++ b/dubbo/src/filter/service.rs
@@ -18,8 +18,7 @@
use tower_service::Service;
use super::Filter;
-use crate::invocation::Metadata;
-use crate::invocation::Request;
+use crate::invocation::{Metadata, Request};
#[derive(Clone)]
pub struct FilterService<S, F> {
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index 354b65a..c8e4a9b 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-use std::collections::HashMap;
-use std::pin::Pin;
+use std::{collections::HashMap, pin::Pin};
-use futures::future;
-use futures::Future;
+use futures::{future, Future};
-use crate::common::url::Url;
-use crate::protocol::{BoxExporter, Protocol};
-use crate::registry::protocol::RegistryProtocol;
+use crate::{
+ common::url::Url,
+ protocol::{BoxExporter, Protocol},
+ registry::protocol::RegistryProtocol,
+};
use dubbo_config::{get_global_config, RootConfig};
// Invoker是否可以基于hyper写一个通用的
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index bf97ef7..2174365 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -29,8 +29,7 @@ pub mod triple;
pub mod utils;
use http_body::Body;
-use std::future::Future;
-use std::pin::Pin;
+use std::{future::Future, pin::Pin};
pub use framework::Dubbo;
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index f58212d..9505dca 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -18,8 +18,10 @@
pub mod server_desc;
pub mod triple;
-use std::future::Future;
-use std::task::{Context, Poll};
+use std::{
+ future::Future,
+ task::{Context, Poll},
+};
use async_trait::async_trait;
use tower_service::Service;
diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs
index b039f65..e97b150 100644
--- a/dubbo/src/protocol/triple/mod.rs
+++ b/dubbo/src/protocol/triple/mod.rs
@@ -21,11 +21,9 @@ pub mod triple_protocol;
pub mod triple_server;
use lazy_static::lazy_static;
-use std::collections::HashMap;
-use std::sync::RwLock;
+use std::{collections::HashMap, sync::RwLock};
-use crate::utils::boxed_clone::BoxCloneService;
-use crate::BoxBody;
+use crate::{utils::boxed_clone::BoxCloneService, BoxBody};
pub type GrpcBoxCloneService =
BoxCloneService<http::Request<hyper::Body>, http::Response<BoxBody>,
std::convert::Infallible>;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs
b/dubbo/src/protocol/triple/triple_invoker.rs
index 2c03fa7..4c411d1 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -17,9 +17,7 @@
use tower_service::Service;
-use crate::common::url::Url;
-use crate::protocol::Invoker;
-use crate::triple::client::builder::ClientBoxService;
+use crate::{common::url::Url, protocol::Invoker,
triple::client::builder::ClientBoxService};
pub struct TripleInvoker {
url: Url,
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs
b/dubbo/src/protocol/triple/triple_protocol.rs
index 4d56b7a..3577f52 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-use std::boxed::Box;
-use std::collections::HashMap;
+use std::{boxed::Box, collections::HashMap};
use async_trait::async_trait;
-use super::triple_exporter::TripleExporter;
-use super::triple_invoker::TripleInvoker;
-use super::triple_server::TripleServer;
-use crate::common::url::Url;
-use crate::protocol::{BoxExporter, Protocol};
+use super::{
+ triple_exporter::TripleExporter, triple_invoker::TripleInvoker,
triple_server::TripleServer,
+};
+use crate::{
+ common::url::Url,
+ protocol::{BoxExporter, Protocol},
+};
#[derive(Clone)]
pub struct TripleProtocol {
diff --git a/dubbo/src/registry/memory_registry.rs
b/dubbo/src/registry/memory_registry.rs
index 4d0350e..29e0ec2 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -16,9 +16,10 @@
*/
#![allow(unused_variables, dead_code, missing_docs)]
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::sync::RwLock;
+use std::{
+ collections::HashMap,
+ sync::{Arc, RwLock},
+};
use crate::common::url::Url;
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index bf6a8a5..45a24cb 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-use std::collections::HashMap;
-use std::sync::{Arc, RwLock};
+use std::{
+ collections::HashMap,
+ sync::{Arc, RwLock},
+};
-use super::memory_registry::MemoryRegistry;
-use super::BoxRegistry;
-use crate::common::url::Url;
-use crate::protocol::triple::triple_exporter::TripleExporter;
-use crate::protocol::triple::triple_protocol::TripleProtocol;
-use crate::protocol::BoxExporter;
-use crate::protocol::BoxInvoker;
-use crate::protocol::Protocol;
+use super::{memory_registry::MemoryRegistry, BoxRegistry};
+use crate::{
+ common::url::Url,
+ protocol::{
+ triple::{triple_exporter::TripleExporter,
triple_protocol::TripleProtocol},
+ BoxExporter, BoxInvoker, Protocol,
+ },
+};
#[derive(Clone, Default)]
pub struct RegistryProtocol {
diff --git a/dubbo/src/status.rs b/dubbo/src/status.rs
index 926c1d6..7258b48 100644
--- a/dubbo/src/status.rs
+++ b/dubbo/src/status.rs
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-use std::error::Error;
-use std::fmt;
+use std::{error::Error, fmt};
use http::HeaderValue;
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index 40a6ffb..e19eb94 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-use crate::cluster::directory::StaticDirectory;
-use crate::codegen::Directory;
-use crate::triple::compression::CompressionEncoding;
-use crate::utils::boxed::BoxService;
+use crate::{
+ cluster::directory::StaticDirectory, codegen::Directory,
+ triple::compression::CompressionEncoding, utils::boxed::BoxService,
+};
use super::TripleClient;
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index b9f6492..3c6172d 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -23,15 +23,13 @@ use http::HeaderValue;
use rand::prelude::SliceRandom;
use tower_service::Service;
-use super::super::transport::connection::Connection;
-use super::builder::ClientBuilder;
+use super::{super::transport::connection::Connection, builder::ClientBuilder};
use crate::codegen::{Directory, RpcInvocation};
-use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
-use crate::triple::codec::Codec;
-use crate::triple::compression::CompressionEncoding;
-use crate::triple::decode::Decoding;
-use crate::triple::encode::encode;
+use crate::{
+ invocation::{IntoStreamingRequest, Metadata, Request, Response},
+ triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding,
encode::encode},
+};
#[derive(Debug, Clone, Default)]
pub struct TripleClient {
diff --git a/dubbo/src/triple/codec/buffer.rs b/dubbo/src/triple/codec/buffer.rs
index 850f988..52192ad 100644
--- a/dubbo/src/triple/codec/buffer.rs
+++ b/dubbo/src/triple/codec/buffer.rs
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-use bytes::buf::UninitSlice;
-use bytes::{Buf, BufMut, BytesMut};
+use bytes::{buf::UninitSlice, Buf, BufMut, BytesMut};
/// A specialized buffer to decode gRPC messages from.
#[derive(Debug)]
diff --git a/dubbo/src/triple/compression.rs b/dubbo/src/triple/compression.rs
index b173211..5e3ee86 100644
--- a/dubbo/src/triple/compression.rs
+++ b/dubbo/src/triple/compression.rs
@@ -18,8 +18,10 @@
use std::collections::HashMap;
use bytes::{Buf, BufMut, BytesMut};
-use flate2::read::{GzDecoder, GzEncoder};
-use flate2::Compression;
+use flate2::{
+ read::{GzDecoder, GzEncoder},
+ Compression,
+};
use lazy_static::lazy_static;
pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
diff --git a/dubbo/src/triple/decode.rs b/dubbo/src/triple/decode.rs
index bfaf46c..26c4cd6 100644
--- a/dubbo/src/triple/decode.rs
+++ b/dubbo/src/triple/decode.rs
@@ -18,13 +18,14 @@
use std::{pin::Pin, task::Poll};
use bytes::{Buf, BufMut, Bytes, BytesMut};
-use futures_util::Stream;
-use futures_util::{future, ready};
+use futures_util::{future, ready, Stream};
use http_body::Body;
use super::compression::{decompress, CompressionEncoding};
-use crate::invocation::Metadata;
-use crate::triple::codec::{DecodeBuf, Decoder};
+use crate::{
+ invocation::Metadata,
+ triple::codec::{DecodeBuf, Decoder},
+};
type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes,
crate::status::Status>;
diff --git a/dubbo/src/triple/server/builder.rs
b/dubbo/src/triple/server/builder.rs
index ce494db..d1bcbcc 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -20,13 +20,11 @@ use std::{
str::FromStr,
};
-use http::Uri;
-use http::{Request, Response};
+use http::{Request, Response, Uri};
use hyper::body::Body;
use tower_service::Service;
-use crate::BoxBody;
-use crate::{common::url::Url, triple::transport::DubboServer};
+use crate::{common::url::Url, triple::transport::DubboServer, BoxBody};
#[derive(Clone, Default, Debug)]
pub struct ServerBuilder {
diff --git a/dubbo/src/triple/server/service.rs
b/dubbo/src/triple/server/service.rs
index bce9618..1b1c55a 100644
--- a/dubbo/src/triple/server/service.rs
+++ b/dubbo/src/triple/server/service.rs
@@ -18,8 +18,10 @@
use futures_util::{Future, Stream};
use tower_service::Service;
-use crate::invocation::{Request, Response};
-use crate::triple::decode::Decoding;
+use crate::{
+ invocation::{Request, Response},
+ triple::decode::Decoding,
+};
pub trait StreamingSvc<R> {
type Response;
diff --git a/dubbo/src/triple/server/triple.rs
b/dubbo/src/triple/server/triple.rs
index d1d1a1d..2c0626c 100644
--- a/dubbo/src/triple/server/triple.rs
+++ b/dubbo/src/triple/server/triple.rs
@@ -18,15 +18,17 @@
use futures_util::{future, stream, StreamExt, TryStreamExt};
use http_body::Body;
-use crate::invocation::Request;
-use crate::triple::codec::Codec;
-use crate::triple::compression::{CompressionEncoding, COMPRESSIONS};
-use crate::triple::decode::Decoding;
-use crate::triple::encode::encode_server;
-use crate::triple::server::service::{
- ClientStreamingSvc, ServerStreamingSvc, StreamingSvc, UnarySvc,
+use crate::{
+ invocation::Request,
+ triple::{
+ codec::Codec,
+ compression::{CompressionEncoding, COMPRESSIONS},
+ decode::Decoding,
+ encode::encode_server,
+ server::service::{ClientStreamingSvc, ServerStreamingSvc,
StreamingSvc, UnarySvc},
+ },
+ BoxBody,
};
-use crate::BoxBody;
use dubbo_config::BusinessConfig;
pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
diff --git a/dubbo/src/triple/transport/connection.rs
b/dubbo/src/triple/transport/connection.rs
index 33e164b..fe9cc16 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -17,12 +17,10 @@
use std::task::Poll;
-use hyper::client::conn::Builder;
-use hyper::client::service::Connect;
+use hyper::client::{conn::Builder, service::Connect};
use tower_service::Service;
-use crate::boxed;
-use crate::triple::transport::connector::get_connector;
+use crate::{boxed, triple::transport::connector::get_connector};
#[derive(Debug, Clone)]
pub struct Connection {
diff --git a/dubbo/src/triple/transport/connector/http_connector.rs
b/dubbo/src/triple/transport/connector/http_connector.rs
index b99b036..f324466 100644
--- a/dubbo/src/triple/transport/connector/http_connector.rs
+++ b/dubbo/src/triple/transport/connector/http_connector.rs
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
-use std::str::FromStr;
+use std::{
+ net::{Ipv4Addr, SocketAddr, SocketAddrV4},
+ str::FromStr,
+};
use http::Uri;
use hyper::client::connect::dns::Name;
use tokio::net::TcpStream;
use tower_service::Service;
-use crate::triple::transport::resolver::dns::DnsResolver;
-use crate::triple::transport::resolver::Resolve;
+use crate::triple::transport::resolver::{dns::DnsResolver, Resolve};
#[derive(Clone, Default)]
pub struct HttpConnector<R = DnsResolver> {
diff --git a/dubbo/src/triple/transport/connector/unix_connector.rs
b/dubbo/src/triple/transport/connector/unix_connector.rs
index 4e1f3fa..491ba23 100644
--- a/dubbo/src/triple/transport/connector/unix_connector.rs
+++ b/dubbo/src/triple/transport/connector/unix_connector.rs
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
-use std::str::FromStr;
+use std::{
+ net::{Ipv4Addr, SocketAddr, SocketAddrV4},
+ str::FromStr,
+};
use http::Uri;
use hyper::client::connect::dns::Name;
use tokio::net::UnixStream;
use tower_service::Service;
-use crate::triple::transport::resolver::dns::DnsResolver;
-use crate::triple::transport::resolver::Resolve;
+use crate::triple::transport::resolver::{dns::DnsResolver, Resolve};
#[derive(Clone, Default)]
pub struct UnixConnector<R = DnsResolver> {
diff --git a/dubbo/src/triple/transport/listener/tcp_listener.rs
b/dubbo/src/triple/transport/listener/tcp_listener.rs
index 4e64987..a7c9487 100644
--- a/dubbo/src/triple/transport/listener/tcp_listener.rs
+++ b/dubbo/src/triple/transport/listener/tcp_listener.rs
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-use std::net::SocketAddr;
-use std::task;
+use std::{net::SocketAddr, task};
use super::Listener;
use async_trait::async_trait;
diff --git a/dubbo/src/triple/transport/resolver/dns.rs
b/dubbo/src/triple/transport/resolver/dns.rs
index f3d4448..985ef12 100644
--- a/dubbo/src/triple/transport/resolver/dns.rs
+++ b/dubbo/src/triple/transport/resolver/dns.rs
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-use std::future::Future;
-use std::net::SocketAddr;
-use std::net::ToSocketAddrs;
-use std::pin::Pin;
-use std::task::Poll;
-use std::vec;
+use std::{
+ future::Future,
+ net::{SocketAddr, ToSocketAddrs},
+ pin::Pin,
+ task::Poll,
+ vec,
+};
use tokio::task::JoinHandle;
use tower_service::Service;
diff --git a/dubbo/src/triple/transport/resolver/mod.rs
b/dubbo/src/triple/transport/resolver/mod.rs
index 3b6111c..4723135 100644
--- a/dubbo/src/triple/transport/resolver/mod.rs
+++ b/dubbo/src/triple/transport/resolver/mod.rs
@@ -17,8 +17,10 @@
pub mod dns;
-use std::net::SocketAddr;
-use std::task::{self, Poll};
+use std::{
+ net::SocketAddr,
+ task::{self, Poll},
+};
use futures::Future;
use hyper::client::connect::dns::Name;
diff --git a/dubbo/src/triple/transport/router.rs
b/dubbo/src/triple/transport/router.rs
index a64306b..07ebcde 100644
--- a/dubbo/src/triple/transport/router.rs
+++ b/dubbo/src/triple/transport/router.rs
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-use std::fmt;
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
+use std::{
+ fmt,
+ pin::Pin,
+ task::{Context, Poll},
+};
use axum::Router;
use futures_core::Future;
diff --git a/dubbo/src/triple/transport/service.rs
b/dubbo/src/triple/transport/service.rs
index a049c1d..9698276 100644
--- a/dubbo/src/triple/transport/service.rs
+++ b/dubbo/src/triple/transport/service.rs
@@ -23,8 +23,7 @@ use hyper::body::Body;
use tokio::time::Duration;
use tower_service::Service;
-use super::listener::get_listener;
-use super::router::DubboRouter;
+use super::{listener::get_listener, router::DubboRouter};
use crate::BoxBody;
#[derive(Default, Clone, Debug)]
diff --git a/dubbo/src/utils/boxed.rs b/dubbo/src/utils/boxed.rs
index 326dea6..9d7b181 100644
--- a/dubbo/src/utils/boxed.rs
+++ b/dubbo/src/utils/boxed.rs
@@ -19,8 +19,8 @@ use tower::ServiceExt;
use tower_layer::{layer_fn, LayerFn};
use tower_service::Service;
-use std::fmt;
use std::{
+ fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs
index f13d1e8..4d5bb3a 100644
--- a/examples/echo/src/echo/client.rs
+++ b/examples/echo/src/echo/client.rs
@@ -16,8 +16,7 @@
*/
use dubbo::codegen::*;
-use example_echo::generated::generated::echo_client::EchoClient;
-use example_echo::generated::generated::EchoRequest;
+use example_echo::generated::generated::{echo_client::EchoClient, EchoRequest};
use futures_util::StreamExt;
pub struct FakeFilter {}
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index e09ba96..d7332b2 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -15,14 +15,11 @@
* limitations under the License.
*/
-use std::io::ErrorKind;
-use std::pin::Pin;
+use std::{io::ErrorKind, pin::Pin};
use async_trait::async_trait;
-use dubbo::filter::context::ContextFilter;
-use dubbo::filter::timeout::TimeoutFilter;
-use futures_util::Stream;
-use futures_util::StreamExt;
+use dubbo::filter::{context::ContextFilter, timeout::TimeoutFilter};
+use futures_util::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs
b/examples/echo/src/generated/grpc.examples.echo.rs
index 9c487e6..aa5d82b 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -1,30 +1,15 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// @generated by apache/dubbo-rust.
/// EchoRequest is the request for echo.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoRequest {
- #[prost(string, tag = "1")]
+ #[prost(string, tag="1")]
pub message: ::prost::alloc::string::String,
}
/// EchoResponse is the response for echo.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoResponse {
- #[prost(string, tag = "1")]
+ #[prost(string, tag="1")]
pub message: ::prost::alloc::string::String,
}
/// Generated client implementations.
@@ -51,12 +36,16 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let codec = dubbo::codegen::ProstCodec::<
+ super::EchoRequest,
+ super::EchoResponse,
+ >::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
- let path =
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
+ let path = http::uri::PathAndQuery::from_static(
+ "/grpc.examples.echo.Echo/UnaryEcho",
+ );
self.inner.unary(request, codec, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
@@ -64,51 +53,51 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let codec = dubbo::codegen::ProstCodec::<
+ super::EchoRequest,
+ super::EchoResponse,
+ >::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
- self.inner
- .server_streaming(request, codec, path, invocation)
- .await
+ self.inner.server_streaming(request, codec, path, invocation).await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let codec = dubbo::codegen::ProstCodec::<
+ super::EchoRequest,
+ super::EchoResponse,
+ >::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
- self.inner
- .client_streaming(request, codec, path, invocation)
- .await
+ self.inner.client_streaming(request, codec, path, invocation).await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>,
dubbo::status::Status> {
- let codec =
- dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let codec = dubbo::codegen::ProstCodec::<
+ super::EchoRequest,
+ super::EchoResponse,
+ >::default();
let invocation = RpcInvocation::default()
.with_servie_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
- self.inner
- .bidi_streaming(request, codec, path, invocation)
- .await
+ self.inner.bidi_streaming(request, codec, path, invocation).await
}
}
}
@@ -125,7 +114,9 @@ pub mod echo_server {
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the ServerStreamingEcho method.
- type ServerStreamingEchoStream: futures_util::Stream<Item =
Result<super::EchoResponse, dubbo::status::Status>>
+ type ServerStreamingEchoStream: futures_util::Stream<
+ Item = Result<super::EchoResponse, dubbo::status::Status>,
+ >
+ Send
+ 'static;
/// ServerStreamingEcho is server side streaming.
@@ -139,14 +130,19 @@ pub mod echo_server {
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the BidirectionalStreamingEcho
method.
- type BidirectionalStreamingEchoStream: futures_util::Stream<Item =
Result<super::EchoResponse, dubbo::status::Status>>
+ type BidirectionalStreamingEchoStream: futures_util::Stream<
+ Item = Result<super::EchoResponse, dubbo::status::Status>,
+ >
+ Send
+ 'static;
/// BidirectionalStreamingEcho is bidi streaming.
async fn bidirectional_streaming_echo(
&self,
request: Request<Decoding<super::EchoRequest>>,
- ) -> Result<Response<Self::BidirectionalStreamingEchoStream>,
dubbo::status::Status>;
+ ) -> Result<
+ Response<Self::BidirectionalStreamingEchoStream>,
+ dubbo::status::Status,
+ >;
}
/// Echo is the echo service.
#[derive(Debug)]
@@ -176,7 +172,10 @@ pub mod echo_server {
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
- fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(),
Self::Error>> {
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -189,18 +188,26 @@ pub mod echo_server {
}
impl<T: Echo> UnarySvc<super::EchoRequest> for
UnaryEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<Response<Self::Response>,
dubbo::status::Status>;
- fn call(&mut self, request:
Request<super::EchoRequest>) -> Self::Future {
+ type Future = BoxFuture<
+ Response<Self::Response>,
+ dubbo::status::Status,
+ >;
+ fn call(
+ &mut self,
+ request: Request<super::EchoRequest>,
+ ) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move {
inner.unary_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server = TripleServer::new(
+ dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default(),
+ );
let res = server.unary(UnaryEchoServer { inner },
req).await;
Ok(res)
};
@@ -211,22 +218,32 @@ pub mod echo_server {
struct ServerStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for
ServerStreamingEchoServer<T> {
+ impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
+ for ServerStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::ServerStreamingEchoStream;
- type Future =
- BoxFuture<Response<Self::ResponseStream>,
dubbo::status::Status>;
- fn call(&mut self, request:
Request<super::EchoRequest>) -> Self::Future {
+ type Future = BoxFuture<
+ Response<Self::ResponseStream>,
+ dubbo::status::Status,
+ >;
+ fn call(
+ &mut self,
+ request: Request<super::EchoRequest>,
+ ) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
inner.server_streaming_echo(request).await };
+ let fut = async move {
+ inner.server_streaming_echo(request).await
+ };
Box::pin(fut)
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server = TripleServer::new(
+ dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default(),
+ );
let res = server
.server_streaming(ServerStreamingEchoServer {
inner }, req)
.await;
@@ -239,23 +256,31 @@ pub mod echo_server {
struct ClientStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for
ClientStreamingEchoServer<T> {
+ impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
+ for ClientStreamingEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<Response<Self::Response>,
dubbo::status::Status>;
+ type Future = BoxFuture<
+ Response<Self::Response>,
+ dubbo::status::Status,
+ >;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
inner.client_streaming_echo(request).await };
+ let fut = async move {
+ inner.client_streaming_echo(request).await
+ };
Box::pin(fut)
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server = TripleServer::new(
+ dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default(),
+ );
let res = server
.client_streaming(ClientStreamingEchoServer {
inner }, req)
.await;
@@ -268,41 +293,56 @@ pub mod echo_server {
struct BidirectionalStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> StreamingSvc<super::EchoRequest> for
BidirectionalStreamingEchoServer<T> {
+ impl<T: Echo> StreamingSvc<super::EchoRequest>
+ for BidirectionalStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream =
T::BidirectionalStreamingEchoStream;
- type Future =
- BoxFuture<Response<Self::ResponseStream>,
dubbo::status::Status>;
+ type Future = BoxFuture<
+ Response<Self::ResponseStream>,
+ dubbo::status::Status,
+ >;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut =
- async move {
inner.bidirectional_streaming_echo(request).await };
+ let fut = async move {
+
inner.bidirectional_streaming_echo(request).await
+ };
Box::pin(fut)
}
}
let fut = async move {
- let mut server =
TripleServer::new(dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default());
+ let mut server = TripleServer::new(
+ dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default(),
+ );
let res = server
- .bidi_streaming(BidirectionalStreamingEchoServer {
inner }, req)
+ .bidi_streaming(
+ BidirectionalStreamingEchoServer {
+ inner,
+ },
+ req,
+ )
.await;
Ok(res)
};
Box::pin(fut)
}
- _ => Box::pin(async move {
- Ok(http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap())
- }),
+ _ => {
+ Box::pin(async move {
+ Ok(
+ http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap(),
+ )
+ })
+ }
}
}
}
diff --git a/registry-zookeeper/src/zookeeper_registry.rs
b/registry-zookeeper/src/zookeeper_registry.rs
index 4c82634..be4e24b 100644
--- a/registry-zookeeper/src/zookeeper_registry.rs
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -17,18 +17,17 @@
#![allow(unused_variables, dead_code, missing_docs)]
-use dubbo::common::url::Url;
-use dubbo::registry::memory_registry::MemoryNotifyListener;
-use dubbo::registry::NotifyListener;
-use dubbo::registry::Registry;
-use dubbo::registry::ServiceEvent;
-use dubbo::StdError;
+use dubbo::{
+ common::url::Url,
+ registry::{memory_registry::MemoryNotifyListener, NotifyListener,
Registry, ServiceEvent},
+ StdError,
+};
use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::sync::RwLock;
-use std::time::Duration;
+use std::{
+ collections::{HashMap, HashSet},
+ sync::{Arc, RwLock},
+ time::Duration,
+};
use tracing::info;
use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};