This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a68707a  [ISSUES #190] Add invoker extension and enhance extension 
design (#195)
a68707a is described below

commit a68707a413ae51fd94bfa5541838b5934c5bce72
Author: 毛文超 <[email protected]>
AuthorDate: Wed May 29 16:39:40 2024 +0800

    [ISSUES #190] Add invoker extension and enhance extension design (#195)
    
    * add Invoker extension
    
    * refactor extension mod
    
    * remove Extension constraint
    
    * cargo fmt
    
    * add license header
    
    * cargo fmt
    
    * add error handing
    
    * fix compile error
    
    * add load invoker method
---
 dubbo/src/extension/invoker_extension.rs          | 303 ++++++++++++++++++++++
 dubbo/src/extension/mod.rs                        | 108 ++++++--
 dubbo/src/extension/registry_extension.rs         |  23 +-
 dubbo/src/params/extension_param.rs               |   3 +
 examples/echo/src/generated/grpc.examples.echo.rs | 127 ++++++---
 examples/greeter/src/greeter/client.rs            |   6 +-
 examples/greeter/src/greeter/server.rs            |   5 +-
 7 files changed, 506 insertions(+), 69 deletions(-)

diff --git a/dubbo/src/extension/invoker_extension.rs 
b/dubbo/src/extension/invoker_extension.rs
new file mode 100644
index 0000000..8d59e5b
--- /dev/null
+++ b/dubbo/src/extension/invoker_extension.rs
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::{
+    extension::{
+        invoker_extension::proxy::InvokerProxy, Extension, ExtensionFactories, 
ExtensionMetaInfo,
+        LoadExtensionPromise,
+    },
+    params::extension_param::{ExtensionName, ExtensionType},
+    url::UrlParam,
+    StdError, Url,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures_core::Stream;
+use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};
+use thiserror::Error;
+
+#[async_trait]
+pub trait Invoker {
+    async fn invoke(
+        &self,
+        invocation: GrpcInvocation,
+    ) -> Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, StdError>;
+
+    async fn url(&self) -> Result<Url, StdError>;
+}
+
+pub enum CallType {
+    Unary,
+    ClientStream,
+    ServerStream,
+    BiStream,
+}
+
+pub struct GrpcInvocation {
+    service_name: String,
+    method_name: String,
+    arguments: Vec<Argument>,
+    attachments: HashMap<String, String>,
+    call_type: CallType,
+}
+
+pub struct Argument {
+    name: String,
+    value: Box<dyn Stream<Item = Box<dyn Serializable + Send + 'static>> + 
Send + 'static>,
+}
+
+pub trait Serializable {
+    fn serialize(&self, serialization_type: String) -> Result<Bytes, StdError>;
+}
+
+pub trait Deserializable {
+    fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> 
Result<Self, StdError>
+    where
+        Self: Sized;
+}
+
+pub mod proxy {
+    use crate::{
+        extension::invoker_extension::{GrpcInvocation, Invoker},
+        StdError, Url,
+    };
+    use async_trait::async_trait;
+    use bytes::Bytes;
+    use futures_core::Stream;
+    use std::pin::Pin;
+    use thiserror::Error;
+    use tokio::sync::{mpsc::Sender, oneshot};
+    use tracing::error;
+
+    pub(super) enum InvokerOpt {
+        Invoke(
+            GrpcInvocation,
+            oneshot::Sender<Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 
'static>>, StdError>>,
+        ),
+        Url(oneshot::Sender<Result<Url, StdError>>),
+    }
+
+    #[derive(Clone)]
+    pub struct InvokerProxy {
+        tx: Sender<InvokerOpt>,
+    }
+
+    #[async_trait]
+    impl Invoker for InvokerProxy {
+        async fn invoke(
+            &self,
+            invocation: GrpcInvocation,
+        ) -> Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, 
StdError> {
+            let (tx, rx) = oneshot::channel();
+            let ret = self.tx.send(InvokerOpt::Invoke(invocation, tx)).await;
+            match ret {
+                Ok(_) => {}
+                Err(err) => {
+                    error!(
+                        "call invoke method failed by invoker proxy, error: 
{:?}",
+                        err
+                    );
+                    return Err(InvokerProxyError::new(
+                        "call invoke method failed by invoker proxy",
+                    )
+                    .into());
+                }
+            }
+            let ret = rx.await?;
+            ret
+        }
+
+        async fn url(&self) -> Result<Url, StdError> {
+            let (tx, rx) = oneshot::channel();
+            let ret = self.tx.send(InvokerOpt::Url(tx)).await;
+            match ret {
+                Ok(_) => {}
+                Err(err) => {
+                    error!("call url method failed by invoker proxy, error: 
{:?}", err);
+                    return Err(
+                        InvokerProxyError::new("call url method failed by 
invoker proxy").into(),
+                    );
+                }
+            }
+            let ret = rx.await?;
+            ret
+        }
+    }
+
+    impl From<Box<dyn Invoker + Send + 'static>> for InvokerProxy {
+        fn from(invoker: Box<dyn Invoker + Send + 'static>) -> Self {
+            let (tx, mut rx) = tokio::sync::mpsc::channel(64);
+            tokio::spawn(async move {
+                while let Some(opt) = rx.recv().await {
+                    match opt {
+                        InvokerOpt::Invoke(invocation, tx) => {
+                            let result = invoker.invoke(invocation).await;
+                            let callback_ret = tx.send(result);
+                            match callback_ret {
+                                Ok(_) => {}
+                                Err(Err(err)) => {
+                                    error!("invoke method has been called, but 
callback to caller failed. {:?}", err);
+                                }
+                                _ => {}
+                            }
+                        }
+                        InvokerOpt::Url(tx) => {
+                            let ret = tx.send(invoker.url().await);
+                            match ret {
+                                Ok(_) => {}
+                                Err(err) => {
+                                    error!("url method has been called, but 
callback to caller failed. {:?}", err);
+                                }
+                            }
+                        }
+                    }
+                }
+            });
+            InvokerProxy { tx }
+        }
+    }
+
+    #[derive(Error, Debug)]
+    #[error("invoker proxy error: {0}")]
+    pub struct InvokerProxyError(String);
+
+    impl InvokerProxyError {
+        pub fn new(msg: &str) -> Self {
+            InvokerProxyError(msg.to_string())
+        }
+    }
+}
+
+#[derive(Default)]
+pub(super) struct InvokerExtensionLoader {
+    factories: HashMap<String, InvokerExtensionFactory>,
+}
+
+impl InvokerExtensionLoader {
+    pub fn register(&mut self, extension_name: String, factory: 
InvokerExtensionFactory) {
+        self.factories.insert(extension_name, factory);
+    }
+
+    pub fn remove(&mut self, extension_name: String) {
+        self.factories.remove(&extension_name);
+    }
+
+    pub fn load(&mut self, url: Url) -> 
Result<LoadExtensionPromise<InvokerProxy>, StdError> {
+        let extension_name = url.query::<ExtensionName>();
+        let Some(extension_name) = extension_name else {
+            return Err(InvokerExtensionLoaderError::new(
+                "load invoker extension failed, extension mustn't be empty",
+            )
+            .into());
+        };
+        let extension_name = extension_name.value();
+        let factory = self.factories.get_mut(&extension_name);
+        let Some(factory) = factory else {
+            let err_msg = format!(
+                "load {} invoker extension failed, can not found extension 
factory",
+                extension_name
+            );
+            return Err(InvokerExtensionLoaderError(err_msg).into());
+        };
+        factory.create(url)
+    }
+}
+
+type InvokerExtensionConstructor = fn(
+    Url,
+) -> Pin<
+    Box<dyn Future<Output = Result<Box<dyn Invoker + Send + 'static>, 
StdError>> + Send + 'static>,
+>;
+pub(crate) struct InvokerExtensionFactory {
+    constructor: InvokerExtensionConstructor,
+    instances: HashMap<String, LoadExtensionPromise<InvokerProxy>>,
+}
+
+impl InvokerExtensionFactory {
+    pub fn new(constructor: InvokerExtensionConstructor) -> Self {
+        Self {
+            constructor,
+            instances: HashMap::default(),
+        }
+    }
+}
+
+impl InvokerExtensionFactory {
+    pub fn create(&mut self, url: Url) -> 
Result<LoadExtensionPromise<InvokerProxy>, StdError> {
+        let key = url.to_string();
+
+        match self.instances.get(&key) {
+            Some(instance) => Ok(instance.clone()),
+            None => {
+                let constructor = self.constructor;
+                let creator = move |url: Url| {
+                    let invoker_future = constructor(url);
+                    Box::pin(async move {
+                        let invoker = invoker_future.await?;
+                        Ok(InvokerProxy::from(invoker))
+                    })
+                        as Pin<
+                            Box<
+                                dyn Future<Output = Result<InvokerProxy, 
StdError>>
+                                    + Send
+                                    + 'static,
+                            >,
+                        >
+                };
+
+                let promise: LoadExtensionPromise<InvokerProxy> =
+                    LoadExtensionPromise::new(Box::new(creator), url);
+                self.instances.insert(key, promise.clone());
+                Ok(promise)
+            }
+        }
+    }
+}
+
+pub struct InvokerExtension<T>(PhantomData<T>)
+where
+    T: Invoker + Send + 'static;
+
+impl<T> ExtensionMetaInfo for InvokerExtension<T>
+where
+    T: Invoker + Send + 'static,
+    T: Extension<Target = Box<dyn Invoker + Send + 'static>>,
+{
+    fn name() -> String {
+        T::name()
+    }
+
+    fn extension_type() -> ExtensionType {
+        ExtensionType::Invoker
+    }
+
+    fn extension_factory() -> ExtensionFactories {
+        
ExtensionFactories::InvokerExtensionFactory(InvokerExtensionFactory::new(
+            <T as Extension>::create,
+        ))
+    }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub struct InvokerExtensionLoaderError(String);
+
+impl InvokerExtensionLoaderError {
+    pub fn new(msg: &str) -> Self {
+        InvokerExtensionLoaderError(msg.to_string())
+    }
+}
diff --git a/dubbo/src/extension/mod.rs b/dubbo/src/extension/mod.rs
index 1229ff4..724b64c 100644
--- a/dubbo/src/extension/mod.rs
+++ b/dubbo/src/extension/mod.rs
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
+mod invoker_extension;
 pub mod registry_extension;
 
 use crate::{
-    extension::registry_extension::proxy::RegistryProxy,
+    extension::{
+        invoker_extension::proxy::InvokerProxy,
+        registry_extension::{proxy::RegistryProxy, RegistryExtension},
+    },
     logger::tracing::{error, info},
     params::extension_param::ExtensionType,
     registry::registry::StaticRegistry,
@@ -35,6 +39,7 @@ pub static EXTENSIONS: 
once_cell::sync::Lazy<ExtensionDirectoryCommander> =
 #[derive(Default)]
 struct ExtensionDirectory {
     registry_extension_loader: registry_extension::RegistryExtensionLoader,
+    invoker_extension_loader: invoker_extension::InvokerExtensionLoader,
 }
 
 impl ExtensionDirectory {
@@ -47,8 +52,8 @@ impl ExtensionDirectory {
             // register static registry extension
             let _ = extension_directory.register(
                 StaticRegistry::name(),
-                StaticRegistry::convert_to_extension_factories(),
-                ExtensionType::Registry,
+                RegistryExtension::<StaticRegistry>::extension_factory(),
+                RegistryExtension::<StaticRegistry>::extension_type(),
             );
 
             while let Some(extension_opt) = rx.recv().await {
@@ -93,6 +98,15 @@ impl ExtensionDirectory {
                         .register(extension_name, registry_extension_factory);
                     Ok(())
                 }
+                _ => Ok(()),
+            },
+            ExtensionType::Invoker => match extension_factories {
+                
ExtensionFactories::InvokerExtensionFactory(invoker_extension_factory) => {
+                    self.invoker_extension_loader
+                        .register(extension_name, invoker_extension_factory);
+                    Ok(())
+                }
+                _ => Ok(()),
             },
         }
     }
@@ -107,6 +121,10 @@ impl ExtensionDirectory {
                 self.registry_extension_loader.remove(extension_name);
                 Ok(())
             }
+            ExtensionType::Invoker => {
+                self.invoker_extension_loader.remove(extension_name);
+                Ok(())
+            }
         }
     }
 
@@ -128,14 +146,37 @@ impl ExtensionDirectory {
                                     let _ = 
callback.send(Ok(Extensions::Registry(extension)));
                                 }
                                 Err(err) => {
-                                    error!("load extension failed: {}", err);
+                                    error!("load registry extension failed: 
{}", err);
                                     let _ = callback.send(Err(err));
                                 }
                             }
                         });
                     }
                     Err(err) => {
-                        error!("load extension failed: {}", err);
+                        error!("load registry extension failed: {}", err);
+                        let _ = callback.send(Err(err));
+                    }
+                }
+            }
+            ExtensionType::Invoker => {
+                let extension = self.invoker_extension_loader.load(url);
+                match extension {
+                    Ok(mut extension) => {
+                        tokio::spawn(async move {
+                            let invoker = extension.resolve().await;
+                            match invoker {
+                                Ok(invoker) => {
+                                    let _ = 
callback.send(Ok(Extensions::Invoker(invoker)));
+                                }
+                                Err(err) => {
+                                    error!("load invoker extension failed: 
{}", err);
+                                    let _ = callback.send(Err(err));
+                                }
+                            }
+                        });
+                    }
+                    Err(err) => {
+                        error!("load invoker extension failed: {}", err);
                         let _ = callback.send(Err(err));
                     }
                 }
@@ -241,12 +282,10 @@ impl ExtensionDirectoryCommander {
     #[allow(private_bounds)]
     pub async fn register<T>(&self) -> Result<(), StdError>
     where
-        T: Extension,
         T: ExtensionMetaInfo,
-        T: ConvertToExtensionFactories,
     {
         let extension_name = T::name();
-        let extension_factories = T::convert_to_extension_factories();
+        let extension_factories = T::extension_factory();
         let extension_type = T::extension_type();
 
         info!(
@@ -286,7 +325,6 @@ impl ExtensionDirectoryCommander {
     #[allow(private_bounds)]
     pub async fn remove<T>(&self) -> Result<(), StdError>
     where
-        T: Extension,
         T: ExtensionMetaInfo,
     {
         let extension_name = T::name();
@@ -355,6 +393,45 @@ impl ExtensionDirectoryCommander {
 
         match extensions {
             Extensions::Registry(proxy) => Ok(proxy),
+            _ => {
+                panic!("load registry extension failed: invalid extension 
type");
+            }
+        }
+    }
+
+    pub async fn load_invoker(&self, url: Url) -> Result<InvokerProxy, 
StdError> {
+        let url_str = url.to_string();
+        info!("load invoker extension: {}", url_str);
+
+        let (tx, rx) = oneshot::channel();
+
+        let send = self
+            .sender
+            .send(ExtensionOpt::Load(url, ExtensionType::Invoker, tx))
+            .await;
+
+        let Ok(_) = send else {
+            let err_msg = format!("load invoker extension failed: {}", 
url_str);
+            return Err(LoadExtensionError::new(err_msg).into());
+        };
+
+        let extensions = rx.await;
+
+        let Ok(extension) = extensions else {
+            let err_msg = format!("load invoker extension failed: {}", 
url_str);
+            return Err(LoadExtensionError::new(err_msg).into());
+        };
+
+        let Ok(extensions) = extension else {
+            let err_msg = format!("load invoker extension failed: {}", 
url_str);
+            return Err(LoadExtensionError::new(err_msg).into());
+        };
+
+        match extensions {
+            Extensions::Invoker(proxy) => Ok(proxy),
+            _ => {
+                panic!("load invoker extension failed: invalid extension 
type");
+            }
         }
     }
 }
@@ -374,11 +451,9 @@ enum ExtensionOpt {
     ),
 }
 
-pub(crate) trait Sealed {}
-
 #[allow(private_bounds)]
 #[async_trait::async_trait]
-pub trait Extension: Sealed {
+pub trait Extension {
     type Target;
 
     fn name() -> String;
@@ -388,20 +463,19 @@ pub trait Extension: Sealed {
 
 #[allow(private_bounds)]
 pub(crate) trait ExtensionMetaInfo {
+    fn name() -> String;
     fn extension_type() -> ExtensionType;
+    fn extension_factory() -> ExtensionFactories;
 }
 
 pub(crate) enum Extensions {
     Registry(RegistryProxy),
+    Invoker(InvokerProxy),
 }
 
 pub(crate) enum ExtensionFactories {
     RegistryExtensionFactory(registry_extension::RegistryExtensionFactory),
-}
-
-#[allow(private_bounds)]
-pub(crate) trait ConvertToExtensionFactories {
-    fn convert_to_extension_factories() -> ExtensionFactories;
+    InvokerExtensionFactory(invoker_extension::InvokerExtensionFactory),
 }
 
 #[derive(Error, Debug)]
diff --git a/dubbo/src/extension/registry_extension.rs 
b/dubbo/src/extension/registry_extension.rs
index d625f02..2e9291b 100644
--- a/dubbo/src/extension/registry_extension.rs
+++ b/dubbo/src/extension/registry_extension.rs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-use std::{collections::HashMap, future::Future, pin::Pin};
+use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};
 
 use async_trait::async_trait;
 use thiserror::Error;
@@ -30,8 +30,7 @@ use crate::{
 use proxy::RegistryProxy;
 
 use crate::extension::{
-    ConvertToExtensionFactories, Extension, ExtensionFactories, 
ExtensionMetaInfo, ExtensionType,
-    LoadExtensionPromise,
+    Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType, 
LoadExtensionPromise,
 };
 
 // 
extension://0.0.0.0/?extension-type=registry&extension-name=nacos&registry-url=nacos://127.0.0.1:8848
@@ -63,24 +62,24 @@ pub trait Registry {
     fn url(&self) -> &Url;
 }
 
-impl<T> crate::extension::Sealed for T where T: Registry + Send + Sync + 
'static {}
+pub struct RegistryExtension<T>(PhantomData<T>)
+where
+    T: Registry + Send + Sync + 'static;
 
-impl<T> ExtensionMetaInfo for T
+impl<T> ExtensionMetaInfo for RegistryExtension<T>
 where
     T: Registry + Send + Sync + 'static,
     T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
 {
+    fn name() -> String {
+        T::name()
+    }
+
     fn extension_type() -> ExtensionType {
         ExtensionType::Registry
     }
-}
 
-impl<T> ConvertToExtensionFactories for T
-where
-    T: Registry + Send + Sync + 'static,
-    T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
-{
-    fn convert_to_extension_factories() -> ExtensionFactories {
+    fn extension_factory() -> ExtensionFactories {
         
ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(
             <T as Extension>::create,
         ))
diff --git a/dubbo/src/params/extension_param.rs 
b/dubbo/src/params/extension_param.rs
index 93e0a16..08ec1c9 100644
--- a/dubbo/src/params/extension_param.rs
+++ b/dubbo/src/params/extension_param.rs
@@ -51,6 +51,7 @@ impl FromStr for ExtensionName {
 
 pub enum ExtensionType {
     Registry,
+    Invoker,
 }
 
 impl UrlParam for ExtensionType {
@@ -63,12 +64,14 @@ impl UrlParam for ExtensionType {
     fn value(&self) -> Self::TargetType {
         match self {
             ExtensionType::Registry => "registry".to_owned(),
+            ExtensionType::Invoker => "invoker".to_owned(),
         }
     }
 
     fn as_str(&self) -> Cow<str> {
         match self {
             ExtensionType::Registry => Cow::Borrowed("registry"),
+            ExtensionType::Invoker => Cow::Borrowed("invoker"),
         }
     }
 }
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs 
b/examples/echo/src/generated/grpc.examples.echo.rs
index fc48dc5..ee8cc1e 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -43,7 +43,9 @@ pub mod echo_client {
             let invocation = RpcInvocation::default()
                 
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("UnaryEcho"));
-            let path = 
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
+            let path = http::uri::PathAndQuery::from_static(
+                "/grpc.examples.echo.Echo/UnaryEcho",
+            );
             self.inner.unary(request, path, invocation).await
         }
         /// ServerStreamingEcho is server side streaming.
@@ -100,7 +102,9 @@ pub mod echo_server {
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the ServerStreamingEcho method.
-        type ServerStreamingEchoStream: futures_util::Stream<Item = 
Result<super::EchoResponse, dubbo::status::Status>>
+        type ServerStreamingEchoStream: futures_util::Stream<
+                Item = Result<super::EchoResponse, dubbo::status::Status>,
+            >
             + Send
             + 'static;
         /// ServerStreamingEcho is server side streaming.
@@ -114,14 +118,19 @@ pub mod echo_server {
             request: Request<Decoding<super::EchoRequest>>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the BidirectionalStreamingEcho 
method.
-        type BidirectionalStreamingEchoStream: futures_util::Stream<Item = 
Result<super::EchoResponse, dubbo::status::Status>>
+        type BidirectionalStreamingEchoStream: futures_util::Stream<
+                Item = Result<super::EchoResponse, dubbo::status::Status>,
+            >
             + Send
             + 'static;
         /// BidirectionalStreamingEcho is bidi streaming.
         async fn bidirectional_streaming_echo(
             &self,
             request: Request<Decoding<super::EchoRequest>>,
-        ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, 
dubbo::status::Status>;
+        ) -> Result<
+            Response<Self::BidirectionalStreamingEchoStream>,
+            dubbo::status::Status,
+        >;
     }
     /// Echo is the echo service.
     #[derive(Debug)]
@@ -151,7 +160,10 @@ pub mod echo_server {
         type Response = http::Response<BoxBody>;
         type Error = std::convert::Infallible;
         type Future = BoxFuture<Self::Response, Self::Error>;
-        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
+        fn poll_ready(
+            &mut self,
+            _cx: &mut Context<'_>,
+        ) -> Poll<Result<(), Self::Error>> {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -164,16 +176,24 @@ pub mod echo_server {
                     }
                     impl<T: Echo> UnarySvc<super::EchoRequest> for 
UnaryEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<Response<Self::Response>, 
dubbo::status::Status>;
-                        fn call(&mut self, request: 
Request<super::EchoRequest>) -> Self::Future {
+                        type Future = BoxFuture<
+                            Response<Self::Response>,
+                            dubbo::status::Status,
+                        >;
+                        fn call(
+                            &mut self,
+                            request: Request<super::EchoRequest>,
+                        ) -> Self::Future {
                             let inner = self.inner.0.clone();
                             let fut = async move { 
inner.unary_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server =
-                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
+                        let mut server = TripleServer::<
+                            super::EchoRequest,
+                            super::EchoResponse,
+                        >::new();
                         let res = server.unary(UnaryEchoServer { inner }, 
req).await;
                         Ok(res)
                     };
@@ -184,20 +204,30 @@ pub mod echo_server {
                     struct ServerStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for 
ServerStreamingEchoServer<T> {
+                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
+                    for ServerStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = T::ServerStreamingEchoStream;
-                        type Future =
-                            BoxFuture<Response<Self::ResponseStream>, 
dubbo::status::Status>;
-                        fn call(&mut self, request: 
Request<super::EchoRequest>) -> Self::Future {
+                        type Future = BoxFuture<
+                            Response<Self::ResponseStream>,
+                            dubbo::status::Status,
+                        >;
+                        fn call(
+                            &mut self,
+                            request: Request<super::EchoRequest>,
+                        ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move { 
inner.server_streaming_echo(request).await };
+                            let fut = async move {
+                                inner.server_streaming_echo(request).await
+                            };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server =
-                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
+                        let mut server = TripleServer::<
+                            super::EchoRequest,
+                            super::EchoResponse,
+                        >::new();
                         let res = server
                             .server_streaming(ServerStreamingEchoServer { 
inner }, req)
                             .await;
@@ -210,21 +240,29 @@ pub mod echo_server {
                     struct ClientStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for 
ClientStreamingEchoServer<T> {
+                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
+                    for ClientStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<Response<Self::Response>, 
dubbo::status::Status>;
+                        type Future = BoxFuture<
+                            Response<Self::Response>,
+                            dubbo::status::Status,
+                        >;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move { 
inner.client_streaming_echo(request).await };
+                            let fut = async move {
+                                inner.client_streaming_echo(request).await
+                            };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server =
-                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
+                        let mut server = TripleServer::<
+                            super::EchoRequest,
+                            super::EchoResponse,
+                        >::new();
                         let res = server
                             .client_streaming(ClientStreamingEchoServer { 
inner }, req)
                             .await;
@@ -237,39 +275,54 @@ pub mod echo_server {
                     struct BidirectionalStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> StreamingSvc<super::EchoRequest> for 
BidirectionalStreamingEchoServer<T> {
+                    impl<T: Echo> StreamingSvc<super::EchoRequest>
+                    for BidirectionalStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = 
T::BidirectionalStreamingEchoStream;
-                        type Future =
-                            BoxFuture<Response<Self::ResponseStream>, 
dubbo::status::Status>;
+                        type Future = BoxFuture<
+                            Response<Self::ResponseStream>,
+                            dubbo::status::Status,
+                        >;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut =
-                                async move { 
inner.bidirectional_streaming_echo(request).await };
+                            let fut = async move {
+                                
inner.bidirectional_streaming_echo(request).await
+                            };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server =
-                            TripleServer::<super::EchoRequest, 
super::EchoResponse>::new();
+                        let mut server = TripleServer::<
+                            super::EchoRequest,
+                            super::EchoResponse,
+                        >::new();
                         let res = server
-                            .bidi_streaming(BidirectionalStreamingEchoServer { 
inner }, req)
+                            .bidi_streaming(
+                                BidirectionalStreamingEchoServer {
+                                    inner,
+                                },
+                                req,
+                            )
                             .await;
                         Ok(res)
                     };
                     Box::pin(fut)
                 }
-                _ => Box::pin(async move {
-                    Ok(http::Response::builder()
-                        .status(200)
-                        .header("grpc-status", "12")
-                        .header("content-type", "application/grpc")
-                        .body(empty_body())
-                        .unwrap())
-                }),
+                _ => {
+                    Box::pin(async move {
+                        Ok(
+                            http::Response::builder()
+                                .status(200)
+                                .header("grpc-status", "12")
+                                .header("content-type", "application/grpc")
+                                .body(empty_body())
+                                .unwrap(),
+                        )
+                    })
+                }
             }
         }
     }
diff --git a/examples/greeter/src/greeter/client.rs 
b/examples/greeter/src/greeter/client.rs
index d2edae5..f6fea89 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -22,7 +22,7 @@ pub mod protos {
 
 use dubbo::codegen::*;
 
-use dubbo::extension;
+use dubbo::{extension, extension::registry_extension::RegistryExtension};
 use futures_util::StreamExt;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
 use registry_nacos::NacosRegistry;
@@ -31,7 +31,9 @@ use registry_nacos::NacosRegistry;
 async fn main() {
     dubbo::logger::init();
 
-    let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
+    let _ = extension::EXTENSIONS
+        .register::<RegistryExtension<NacosRegistry>>()
+        .await;
 
     let builder = 
ClientBuilder::new().with_registry("nacos://127.0.0.1:8848".parse().unwrap());
 
diff --git a/examples/greeter/src/greeter/server.rs 
b/examples/greeter/src/greeter/server.rs
index 17a3ac4..aecc1f8 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -27,6 +27,7 @@ use dubbo::{
     codegen::*,
     config::RootConfig,
     extension,
+    extension::registry_extension::RegistryExtension,
     logger::{
         tracing::{info, span},
         Level,
@@ -60,7 +61,9 @@ async fn main() {
         Err(_err) => panic!("err: {:?}", _err), // response was droped
     };
 
-    let _ = extension::EXTENSIONS.register::<NacosRegistry>().await;
+    let _ = extension::EXTENSIONS
+        .register::<RegistryExtension<NacosRegistry>>()
+        .await;
     let mut f = Dubbo::new()
         .with_config(r)
         .add_registry("nacos://127.0.0.1:8848/");


Reply via email to