This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new c0b257adac issue #11458:Triple stub support async mode (#11464)
c0b257adac is described below
commit c0b257adaca4263cb4af359150b3f8012381331a
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Mon Feb 6 17:28:49 2023 +0800
issue #11458:Triple stub support async mode (#11464)
* issue #11458:Triple stub support async mode
* issue 11458:Triple stub support async mode
* issue 11458:Triple stub support async mode
---
.../src/main/resources/Dubbo3TripleStub.mustache | 33 ++++++++++++++++++++++
.../apache/dubbo/rpc/stub/StubInvocationUtil.java | 2 ++
2 files changed, 35 insertions(+)
diff --git a/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
b/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
index 77645108de..af383bdcbc 100644
--- a/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
+++ b/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
@@ -72,6 +72,17 @@ public final class {{className}} {
{{inputType}}.class, {{outputType}}.class, serviceDescriptor,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
+
+ private static final StubMethodDescriptor {{methodName}}AsyncMethod = new
StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, java.util.concurrent.CompletableFuture.class,
serviceDescriptor, MethodDescriptor.RpcType.UNARY,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+
+ private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod =
new StubMethodDescriptor("{{originMethodName}}Async",
+ {{inputType}}.class, {{outputType}}.class, serviceDescriptor,
MethodDescriptor.RpcType.UNARY,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+
{{/unaryMethods}}
{{#serverStreamingMethods}}
@@ -121,6 +132,10 @@ public final class {{className}} {
return StubInvocationUtil.unaryCall(invoker, {{methodName}}Method,
request);
}
+ public CompletableFuture<{{outputType}}>
{{methodName}}Async({{inputType}} request){
+ return StubInvocationUtil.unaryCall(invoker,
{{methodName}}AsyncMethod, request);
+ }
+
{{#javaDoc}}
{{{javaDoc}}}
{{/javaDoc}}
@@ -163,6 +178,21 @@ public final class {{className}} {
public static abstract class {{interfaceClassName}}ImplBase implements
{{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
+ private <T, R> BiConsumer<T, StreamObserver<R>>
syncToAsync(java.util.function.Function<T, R> syncFun) {
+ return new BiConsumer<T, StreamObserver<R>>() {
+ @Override
+ public void accept(T t, StreamObserver<R> observer) {
+ try {
+ R ret = syncFun.apply(t);
+ observer.onNext(ret);
+ observer.onCompleted();
+ } catch (Throwable e) {
+ observer.onError(e);
+ }
+ }
+ };
+ }
+
@Override
public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
PathResolver pathResolver = url.getOrDefaultFrameworkModel()
@@ -172,11 +202,14 @@ public final class {{className}} {
{{#methods}}
pathResolver.addNativeStub( "/" + SERVICE_NAME +
"/{{originMethodName}}" );
+ pathResolver.addNativeStub( "/" + SERVICE_NAME +
"/{{originMethodName}}Async" );
{{/methods}}
{{#unaryMethods}}
BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>>
{{methodName}}Func = this::{{methodName}};
handlers.put({{methodName}}Method.getMethodName(), new
UnaryStubMethodHandler<>({{methodName}}Func));
+ BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>>
{{methodName}}AsyncFunc = syncToAsync(this::{{methodName}});
+ handlers.put({{methodName}}ProxyAsyncMethod.getMethodName(), new
UnaryStubMethodHandler<>({{methodName}}AsyncFunc));
{{/unaryMethods}}
{{#serverStreamingMethods}}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
index cb5232744f..123cbbfd27 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
@@ -58,6 +58,8 @@ public class StubInvocationUtil {
methodDescriptor.getMethodName(), invoker.getInterface().getName(),
invoker.getUrl().getProtocolServiceKey(),
methodDescriptor.getParameterClasses(),
arguments);
+ //When there are multiple MethodDescriptors with the same method name,
the return type will be wrong
+ rpcInvocation.setReturnType(methodDescriptor.getReturnClass());
try {
return InvocationUtil.invoke(invoker, rpcInvocation);
} catch (Throwable e) {