This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new c0b257adac issue #11458:Triple stub support async mode (#11464)
c0b257adac is described below

commit c0b257adaca4263cb4af359150b3f8012381331a
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Mon Feb 6 17:28:49 2023 +0800

    issue #11458:Triple stub support async mode (#11464)
    
    * issue #11458:Triple stub support async mode
    
    * issue 11458:Triple stub support async mode
    
    * issue 11458:Triple stub support async mode
---
 .../src/main/resources/Dubbo3TripleStub.mustache   | 33 ++++++++++++++++++++++
 .../apache/dubbo/rpc/stub/StubInvocationUtil.java  |  2 ++
 2 files changed, 35 insertions(+)

diff --git a/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache 
b/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
index 77645108de..af383bdcbc 100644
--- a/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
+++ b/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
@@ -72,6 +72,17 @@ public final class {{className}} {
     {{inputType}}.class, {{outputType}}.class, serviceDescriptor, 
MethodDescriptor.RpcType.UNARY,
     obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
+
+    private static final StubMethodDescriptor {{methodName}}AsyncMethod = new 
StubMethodDescriptor("{{originMethodName}}",
+    {{inputType}}.class, java.util.concurrent.CompletableFuture.class, 
serviceDescriptor, MethodDescriptor.RpcType.UNARY,
+    obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
+    {{outputType}}::parseFrom);
+
+    private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod = 
new StubMethodDescriptor("{{originMethodName}}Async",
+    {{inputType}}.class, {{outputType}}.class, serviceDescriptor, 
MethodDescriptor.RpcType.UNARY,
+    obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
+    {{outputType}}::parseFrom);
+
 {{/unaryMethods}}
 
 {{#serverStreamingMethods}}
@@ -121,6 +132,10 @@ public final class {{className}} {
             return StubInvocationUtil.unaryCall(invoker, {{methodName}}Method, 
request);
         }
 
+        public CompletableFuture<{{outputType}}> 
{{methodName}}Async({{inputType}} request){
+            return StubInvocationUtil.unaryCall(invoker, 
{{methodName}}AsyncMethod, request);
+        }
+
         {{#javaDoc}}
             {{{javaDoc}}}
         {{/javaDoc}}
@@ -163,6 +178,21 @@ public final class {{className}} {
 
     public static abstract class {{interfaceClassName}}ImplBase implements 
{{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
 
+        private <T, R> BiConsumer<T, StreamObserver<R>> 
syncToAsync(java.util.function.Function<T, R> syncFun) {
+            return new BiConsumer<T, StreamObserver<R>>() {
+                @Override
+                public void accept(T t, StreamObserver<R> observer) {
+                    try {
+                        R ret = syncFun.apply(t);
+                        observer.onNext(ret);
+                        observer.onCompleted();
+                    } catch (Throwable e) {
+                        observer.onError(e);
+                    }
+                }
+            };
+        }
+
         @Override
         public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
             PathResolver pathResolver = url.getOrDefaultFrameworkModel()
@@ -172,11 +202,14 @@ public final class {{className}} {
 
         {{#methods}}
             pathResolver.addNativeStub( "/" + SERVICE_NAME + 
"/{{originMethodName}}" );
+            pathResolver.addNativeStub( "/" + SERVICE_NAME + 
"/{{originMethodName}}Async" );
         {{/methods}}
 
         {{#unaryMethods}}
             BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>> 
{{methodName}}Func = this::{{methodName}};
             handlers.put({{methodName}}Method.getMethodName(), new 
UnaryStubMethodHandler<>({{methodName}}Func));
+            BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>> 
{{methodName}}AsyncFunc = syncToAsync(this::{{methodName}});
+            handlers.put({{methodName}}ProxyAsyncMethod.getMethodName(), new 
UnaryStubMethodHandler<>({{methodName}}AsyncFunc));
         {{/unaryMethods}}
 
         {{#serverStreamingMethods}}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
index cb5232744f..123cbbfd27 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
@@ -58,6 +58,8 @@ public class StubInvocationUtil {
             methodDescriptor.getMethodName(), invoker.getInterface().getName(),
             invoker.getUrl().getProtocolServiceKey(), 
methodDescriptor.getParameterClasses(),
             arguments);
+        //When there are multiple MethodDescriptors with the same method name, 
the return type will be wrong
+        rpcInvocation.setReturnType(methodDescriptor.getReturnClass());
         try {
             return InvocationUtil.invoke(invoker, rpcInvocation);
         } catch (Throwable e) {

Reply via email to