This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new e069a7af9e Support dubbo reactive stream with project reactor (#10290)
e069a7af9e is described below
commit e069a7af9e2d5b920b63845d5228b6b3deca1a8c
Author: Kunshuai Zhu <[email protected]>
AuthorDate: Mon Aug 15 10:44:48 2022 +0800
Support dubbo reactive stream with project reactor (#10290)
* Support dubbo reactive stream with project reactor
* basically successfully connected
* Solve NPE in first calling CallStreamObserver#request(int)
* Polish code, add comment and license
* Optimize dependency configuration
* Add manyToMany and oneToOne
* Add manyToOne
* Add ReactorDubbo3TripleGenerator
* remove unnecessary commits
* Add lisence header
* Add comment
* remove unused import
* fix npe in TripleInvoker
* Optimize manyToOne
* Polish
* polish
* Fix reduce opration
* Add unit test
* Refactor TripleInvoker#invokeUnary to use UnaryClientCallListener
* merge CancelableStreamObserver and ClientResponseObserver
* discard repeated subscribe silently
* remove AtomicReferenceFieldUpdater
* remove unused volatile
* polish
* polish
* move SafeRequestStreamObserver to CancelableStreamObserver
* remove SafeRequestObserver
* set onStartConsumer empty defultly
* Fix unexpected invoke before subscribe in oneToMany & manyToOne
---
.../tri/reactive/ReactorDubbo3TripleGenerator.java | 62 +++++++
.../ReactorDubbo3TripleInterfaceStub.mustache | 41 +++++
.../resources/ReactorDubbo3TripleStub.mustache | 180 +++++++++++++++++++++
dubbo-rpc/dubbo-rpc-triple/pom.xml | 21 ++-
.../rpc/protocol/tri/CancelableStreamObserver.java | 12 ++
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 14 +-
.../call/ObserverToClientCallListenerAdapter.java | 8 +
.../tri/observer/ServerCallToObserverAdapter.java | 2 +-
.../reactive/AbstractTripleReactorPublisher.java | 169 +++++++++++++++++++
.../reactive/AbstractTripleReactorSubscriber.java | 106 ++++++++++++
.../tri/reactive/ClientTripleReactorPublisher.java | 44 +++++
.../ClientTripleReactorSubscriber.java} | 25 ++-
.../ServerTripleReactorPublisher.java} | 24 ++-
.../reactive/ServerTripleReactorSubscriber.java | 44 +++++
.../tri/reactive/calls/ReactorClientCalls.java | 143 ++++++++++++++++
.../tri/reactive/calls/ReactorServerCalls.java | 136 ++++++++++++++++
.../reactive/handler/ManyToManyMethodHandler.java | 47 ++++++
.../reactive/handler/ManyToOneMethodHandler.java | 48 ++++++
.../reactive/handler/OneToManyMethodHandler.java | 48 ++++++
.../reactive/handler/OneToOneMethodHandler.java | 49 ++++++
.../tri/reactive/ManyToManyMethodHandlerTest.java | 64 ++++++++
.../tri/reactive/ManyToOneMethodHandlerTest.java | 92 +++++++++++
.../tri/reactive/OneToManyMethodHandlerTest.java | 90 +++++++++++
.../tri/reactive/OneToOneMethodHandlerTest.java | 41 +++++
24 files changed, 1473 insertions(+), 37 deletions(-)
diff --git
a/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/reactive/ReactorDubbo3TripleGenerator.java
b/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/reactive/ReactorDubbo3TripleGenerator.java
new file mode 100644
index 0000000000..dfc8d86e61
--- /dev/null
+++
b/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/reactive/ReactorDubbo3TripleGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.gen.tri.reactive;
+
+import com.salesforce.jprotoc.ProtocPlugin;
+import org.apache.dubbo.gen.AbstractGenerator;
+
+public class ReactorDubbo3TripleGenerator extends AbstractGenerator {
+
+ public static void main(String[] args) {
+ if (args.length == 0) {
+ ProtocPlugin.generate(new ReactorDubbo3TripleGenerator());
+ } else {
+ ProtocPlugin.debug(new ReactorDubbo3TripleGenerator(), args[0]);
+ }
+ }
+
+ @Override
+ protected String getClassPrefix() {
+ return "Dubbo";
+ }
+
+ @Override
+ protected String getClassSuffix() {
+ return "Triple";
+ }
+
+ @Override
+ protected String getTemplateFileName() {
+ return "ReactorDubbo3TripleStub.mustache";
+ }
+
+ @Override
+ protected String getInterfaceTemplateFileName() {
+ return "ReactorDubbo3TripleInterfaceStub.mustache";
+ }
+
+ @Override
+ protected String getSingleTemplateFileName() {
+ throw new IllegalStateException("Do not support single template!");
+ }
+
+ @Override
+ protected boolean enableMultipleTemplateFiles() {
+ return true;
+ }
+}
diff --git
a/dubbo-compiler/src/main/resources/ReactorDubbo3TripleInterfaceStub.mustache
b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleInterfaceStub.mustache
new file mode 100644
index 0000000000..b3b900831e
--- /dev/null
+++
b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleInterfaceStub.mustache
@@ -0,0 +1,41 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface {{interfaceClassName}} {
+
+ String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
+
+ String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+
+{{#methods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
reactorRequest) ;
+
+{{/methods}}
+}
diff --git a/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
new file mode 100644
index 0000000000..43b715e697
--- /dev/null
+++ b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
@@ -0,0 +1,180 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import com.google.protobuf.Message;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.PathResolver;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.ServerService;
+import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.model.StubServiceDescriptor;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToManyMethodHandler;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToOneMethodHandler;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToManyMethodHandler;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorClientCalls;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToOneMethodHandler;
+
+import org.apache.dubbo.rpc.stub.StubInvoker;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import org.apache.dubbo.rpc.stub.StubSuppliers;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class {{className}} {
+
+ private {{className}}() {}
+
+ public static final String SERVICE_NAME =
{{interfaceClassName}}.SERVICE_NAME;
+
+ private static final StubServiceDescriptor serviceDescriptor = new
StubServiceDescriptor(SERVICE_NAME,{{interfaceClassName}}.class);
+
+ static {
+ StubSuppliers.addSupplier(SERVICE_NAME, {{className}}::newStub);
+ StubSuppliers.addSupplier({{interfaceClassName}}.JAVA_SERVICE_NAME,
{{className}}::newStub);
+ StubSuppliers.addDescriptor(SERVICE_NAME, serviceDescriptor);
+ StubSuppliers.addDescriptor({{interfaceClassName}}.JAVA_SERVICE_NAME,
serviceDescriptor);
+ }
+
+ @SuppressWarnings("all")
+ public static {{interfaceClassName}} newStub(Invoker<?> invoker) {
+ return new
{{interfaceClassName}}Stub((Invoker<{{interfaceClassName}}>)invoker);
+ }
+
+{{#unaryMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, serviceDescriptor,
MethodDescriptor.RpcType.UNARY,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/unaryMethods}}
+
+{{#serverStreamingMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, serviceDescriptor,
MethodDescriptor.RpcType.SERVER_STREAM,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/serverStreamingMethods}}
+
+{{#clientStreamingMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, serviceDescriptor,
MethodDescriptor.RpcType.CLIENT_STREAM,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/clientStreamingMethods}}
+
+{{#biStreamingWithoutClientStreamMethods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
+ {{inputType}}.class, {{outputType}}.class, serviceDescriptor,
MethodDescriptor.RpcType.BI_STREAM,
+ obj -> ((Message) obj).toByteArray(), obj -> ((Message)
obj).toByteArray(), {{inputType}}::parseFrom,
+ {{outputType}}::parseFrom);
+{{/biStreamingWithoutClientStreamMethods}}
+
+ public static class {{interfaceClassName}}Stub implements
{{interfaceClassName}}{
+
+ private final Invoker<{{interfaceClassName}}> invoker;
+
+ public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}>
invoker) {
+ this.invoker = invoker;
+ }
+
+ {{#methods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ public
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
request) {
+ return ReactorClientCalls.{{reactiveCallsMethodName}}(invoker,
request, {{methodNameCamelCase}}Method);
+ }
+ {{/methods}}
+ }
+
+ public static abstract class {{interfaceClassName}}ImplBase implements
{{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
+
+ @Override
+ public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
+ PathResolver pathResolver = url.getOrDefaultFrameworkModel()
+ .getExtensionLoader(PathResolver.class)
+ .getDefaultExtension();
+ Map<String,StubMethodHandler<?, ?>> handlers = new HashMap<>();
+
+ {{#methods}}
+ pathResolver.addNativeStub( "/" + SERVICE_NAME +
"/{{originMethodName}}" );
+ {{/methods}}
+
+ {{#unaryMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new
OneToOneMethodHandler<>(this::{{methodName}}));
+ {{/unaryMethods}}
+ {{#serverStreamingMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new
OneToManyMethodHandler<>(this::{{methodName}}));
+ {{/serverStreamingMethods}}
+ {{#clientStreamingMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new
ManyToOneMethodHandler<>(this::{{methodName}}));
+ {{/clientStreamingMethods}}
+ {{#biStreamingWithoutClientStreamMethods}}
+ handlers.put({{methodName}}Method.getMethodName(), new
ManyToManyMethodHandler<>(this::{{methodName}}));
+ {{/biStreamingWithoutClientStreamMethods}}
+
+ return new StubInvoker<>(this, url, {{interfaceClassName}}.class,
handlers);
+ }
+
+ {{#methods}}
+ {{#javaDoc}}
+ {{{javaDoc}}}
+ {{/javaDoc}}
+ {{#deprecated}}
+ @java.lang.Deprecated
+ {{/deprecated}}
+ public
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
request) {
+ throw unimplementedMethodException({{methodName}}Method);
+ }
+ {{/methods}}
+
+ @Override
+ public final ServiceDescriptor getServiceDescriptor() {
+ return serviceDescriptor;
+ }
+
+ private RpcException unimplementedMethodException(StubMethodDescriptor
methodDescriptor) {
+ return
TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Method %s is
unimplemented",
+ "/" + serviceDescriptor.getInterfaceName() + "/" +
methodDescriptor.getMethodName())).asException();
+ }
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml
b/dubbo-rpc/dubbo-rpc-triple/pom.xml
index 9ff3afcbae..81e46d1cda 100644
--- a/dubbo-rpc/dubbo-rpc-triple/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml
@@ -29,8 +29,23 @@
<description>The triple protocol module</description>
<properties>
<skip_maven_deploy>false</skip_maven_deploy>
+ <dubbo.compiler.version>0.0.4.1-SNAPSHOT</dubbo.compiler.version>
+ <reactive.version>1.0.4</reactive.version>
+ <reactor.version>3.4.19</reactor.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ <version>${reactive.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <version>${reactor.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-rpc-api</artifactId>
@@ -63,12 +78,6 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- <version>3.4.16</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
index 3a059bc5fb..76ae697ed4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
public abstract class CancelableStreamObserver<T> implements StreamObserver<T>
{
@@ -28,8 +29,19 @@ public abstract class CancelableStreamObserver<T> implements
StreamObserver<T> {
this.cancellationContext = cancellationContext;
}
+ public CancellationContext getCancellationContext() {
+ return cancellationContext;
+ }
+
public void cancel(Throwable throwable) {
cancellationContext.cancel(throwable);
}
+ public void beforeStart(final ClientCallToObserverAdapter<T>
clientCallToObserverAdapter) {
+ // do nothing
+ }
+
+ public void startRequest() {
+ // do nothing
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index ce2ee44066..e358efffca 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -46,6 +46,7 @@ import
org.apache.dubbo.rpc.protocol.tri.call.ObserverToClientCallListenerAdapte
import org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
import org.apache.dubbo.rpc.protocol.tri.call.UnaryClientCallListener;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.support.RpcUtils;
import io.netty.util.AsciiString;
@@ -174,14 +175,18 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
StreamObserver<Object> streamCall(ClientCall call,
RequestMetadata metadata,
StreamObserver<Object> responseObserver) {
+ ObserverToClientCallListenerAdapter listener = new
ObserverToClientCallListenerAdapter(
+ responseObserver);
+ StreamObserver<Object> streamObserver = call.start(metadata, listener);
if (responseObserver instanceof CancelableStreamObserver) {
final CancellationContext context = new CancellationContext();
- ((CancelableStreamObserver<Object>)
responseObserver).setCancellationContext(context);
+ CancelableStreamObserver<Object> cancelableStreamObserver =
(CancelableStreamObserver<Object>) responseObserver;
+ cancelableStreamObserver.setCancellationContext(context);
context.addListener(context1 -> call.cancelByLocal(new
IllegalStateException("Canceled by app")));
+ listener.setOnStartConsumer(dummy ->
cancelableStreamObserver.startRequest());
+
cancelableStreamObserver.beforeStart((ClientCallToObserverAdapter<Object>)
streamObserver);
}
- ObserverToClientCallListenerAdapter listener = new
ObserverToClientCallListenerAdapter(
- responseObserver);
- return call.start(metadata, listener);
+ return streamObserver;
}
AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation
invocation,
@@ -204,6 +209,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
result = new AsyncRpcResult(future, invocation);
FutureContext.getContext().setCompatibleFuture(future);
+
result.setExecutor(callbackExecutor);
ClientCall.Listener callListener = new UnaryClientCallListener(future);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 5fac300f6f..1c934ce970 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -21,16 +21,22 @@ import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.TriRpcStatus;
import java.util.Map;
+import java.util.function.Consumer;
public class ObserverToClientCallListenerAdapter implements
ClientCall.Listener {
private final StreamObserver<Object> delegate;
private ClientCall call;
+ private Consumer<ClientCall> onStartConsumer = clientCall -> { };
public ObserverToClientCallListenerAdapter(StreamObserver<Object>
delegate) {
this.delegate = delegate;
}
+ public void setOnStartConsumer(Consumer<ClientCall> onStartConsumer) {
+ this.onStartConsumer = onStartConsumer;
+ }
+
@Override
public void onMessage(Object message) {
delegate.onNext(message);
@@ -54,5 +60,7 @@ public class ObserverToClientCallListenerAdapter implements
ClientCall.Listener
if (call.isAutoRequest()) {
call.request(1);
}
+
+ onStartConsumer.accept(call);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
index edcd555a21..18bb578f01 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
@@ -47,7 +47,7 @@ public class ServerCallToObserverAdapter<T> extends
CancelableStreamObserver<T>
}
- private boolean isTerminated() {
+ public boolean isTerminated() {
return terminated;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorPublisher.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorPublisher.java
new file mode 100644
index 0000000000..9eab281828
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorPublisher.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * The middle layer between {@link
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive
API. <p>
+ * 1. passing the data received by CallStreamObserver to Reactive consumer <br>
+ * 2. passing the request of Reactive API to CallStreamObserver
+ */
+public abstract class AbstractTripleReactorPublisher<T> extends
CancelableStreamObserver<T> implements Publisher<T>, Subscription {
+
+ private boolean canRequest;
+
+ private long requested;
+
+ // weather publisher has been subscribed
+ private final AtomicBoolean SUBSCRIBED = new AtomicBoolean();
+
+ private volatile Subscriber<? super T> downstream;
+
+ protected volatile CallStreamObserver<?> subscription;
+
+ private final AtomicBoolean HAS_SUBSCRIPTION = new AtomicBoolean();
+
+ // cancel status
+ private volatile boolean isCancelled;
+
+ // complete status
+ private volatile boolean isDone;
+
+ // to help bind TripleSubscriber
+ private volatile Consumer<CallStreamObserver<?>> onSubscribe;
+
+ private volatile Runnable shutdownHook;
+
+ private final AtomicBoolean CALLED_SHUT_DOWN_HOOK = new AtomicBoolean();
+
+ public AbstractTripleReactorPublisher() {
+ }
+
+ public AbstractTripleReactorPublisher(Consumer<CallStreamObserver<?>>
onSubscribe, Runnable shutdownHook) {
+ this.onSubscribe = onSubscribe;
+ this.shutdownHook = shutdownHook;
+ }
+
+ protected void onSubscribe(final CallStreamObserver<?> subscription) {
+ if (subscription != null && this.subscription == null &&
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
+ this.subscription = subscription;
+ subscription.disableAutoFlowControl();
+ if (onSubscribe != null) {
+ onSubscribe.accept(subscription);
+ }
+ return;
+ }
+
+ throw new IllegalStateException(getClass().getSimpleName() + "
supports only a single subscription");
+ }
+
+ @Override
+ public void onNext(T data) {
+ if (isDone || isCancelled) {
+ return;
+ }
+ downstream.onNext(data);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (isDone || isCancelled) {
+ return;
+ }
+ isDone = true;
+ downstream.onError(throwable);
+ doPostShutdown();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (isDone || isCancelled) {
+ return;
+ }
+ isDone = true;
+ downstream.onComplete();
+ doPostShutdown();
+ }
+
+ private void doPostShutdown() {
+ Runnable r = shutdownHook;
+ // CAS to confirm shutdownHook will be run only once.
+ if (r != null && CALLED_SHUT_DOWN_HOOK.compareAndSet(false, true)) {
+ shutdownHook = null;
+ r.run();
+ }
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber) {
+ if (subscriber == null) {
+ throw new NullPointerException();
+ }
+
+ if (SUBSCRIBED.compareAndSet(false, true)) {
+ subscriber.onSubscribe(this);
+ this.downstream = subscriber;
+ if (isCancelled) {
+ this.downstream = null;
+ }
+ }
+ }
+
+ @Override
+ public void request(long l) {
+ synchronized (this) {
+ if (SUBSCRIBED.get() && canRequest) {
+ subscription.request(l >= Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) l);
+ } else {
+ requested += l;
+ }
+ }
+ }
+
+ @Override
+ public void startRequest() {
+ synchronized (this) {
+ if (!canRequest) {
+ canRequest = true;
+ long count = requested;
+ subscription.request(count >= Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) count);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (isCancelled) {
+ return;
+ }
+ isCancelled = true;
+ doPostShutdown();
+ }
+
+ public boolean isCancelled() {
+ return isCancelled;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorSubscriber.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorSubscriber.java
new file mode 100644
index 0000000000..40acec2e9b
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorSubscriber.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.util.annotation.NonNull;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The middle layer between {@link
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive
API. <br>
+ * Passing the data from Reactive producer to CallStreamObserver.
+ */
+public abstract class AbstractTripleReactorSubscriber<T> implements
Subscriber<T>, CoreSubscriber<T> {
+
+ private volatile boolean isCancelled;
+
+ protected volatile CallStreamObserver<T> downstream;
+
+ private final AtomicBoolean SUBSCRIBED = new AtomicBoolean();
+
+ private volatile Subscription subscription;
+
+ private final AtomicBoolean HAS_SUBSCRIBED = new AtomicBoolean();
+
+ // complete status
+ private volatile boolean isDone;
+
+ /**
+ * Binding the downstream, and call subscription#request(1).
+ *
+ * @param downstream downstream
+ */
+ public void subscribe(final CallStreamObserver<T> downstream) {
+ if (downstream == null) {
+ throw new NullPointerException();
+ }
+ if (this.downstream == null && SUBSCRIBED.compareAndSet(false, true)) {
+ this.downstream = downstream;
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(@NonNull final Subscription subscription) {
+ if (this.subscription == null && HAS_SUBSCRIBED.compareAndSet(false,
true)) {
+ this.subscription = subscription;
+ return;
+ }
+ // onSubscribe cannot be called repeatedly
+ subscription.cancel();
+ }
+
+ @Override
+ public void onNext(T t) {
+ if (!isDone && !isCanceled()) {
+ downstream.onNext(t);
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (!isCanceled()) {
+ isDone = true;
+ downstream.onError(throwable);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (!isCanceled()) {
+ isDone = true;
+ downstream.onCompleted();
+ }
+ }
+
+ public void cancel() {
+ if (!isCancelled && subscription != null) {
+ isCancelled = true;
+ subscription.cancel();
+ }
+ }
+
+ public boolean isCanceled() {
+ return isCancelled;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorPublisher.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorPublisher.java
new file mode 100644
index 0000000000..1deab5ac0b
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorPublisher.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+
+import java.util.function.Consumer;
+
+/**
+ * Used in OneToMany & ManyToOne & ManyToMany in client. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for responseStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to
requestStream.
+ */
+public class ClientTripleReactorPublisher<T> extends
AbstractTripleReactorPublisher<T> {
+
+ public ClientTripleReactorPublisher() {
+ }
+
+ public ClientTripleReactorPublisher(Consumer<CallStreamObserver<?>>
onSubscribe, Runnable shutdownHook) {
+ super(onSubscribe, shutdownHook);
+ }
+
+ @Override
+ public void beforeStart(ClientCallToObserverAdapter<T>
clientCallToObserverAdapter) {
+ super.onSubscribe(clientCallToObserverAdapter);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorSubscriber.java
similarity index 60%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
copy to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorSubscriber.java
index 3a059bc5fb..3d5936d61f 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorSubscriber.java
@@ -15,21 +15,20 @@
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.tri;
+package org.apache.dubbo.rpc.protocol.tri.reactive;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
-public abstract class CancelableStreamObserver<T> implements StreamObserver<T>
{
-
- private CancellationContext cancellationContext;
-
- public void setCancellationContext(CancellationContext
cancellationContext) {
- this.cancellationContext = cancellationContext;
- }
+/**
+ * The subscriber in client to subscribe user publisher and is subscribed by
ClientStreamObserver.
+ */
+public class ClientTripleReactorSubscriber<T> extends
AbstractTripleReactorSubscriber<T> {
- public void cancel(Throwable throwable) {
- cancellationContext.cancel(throwable);
+ @Override
+ public void cancel() {
+ if (!isCanceled()) {
+ super.cancel();
+ ((ClientCallToObserverAdapter<T>) downstream).cancel(new
Exception("Cancelled"));
+ }
}
-
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorPublisher.java
similarity index 57%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
copy to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorPublisher.java
index 3a059bc5fb..33addf5d65 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorPublisher.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.tri;
+package org.apache.dubbo.rpc.protocol.tri.reactive;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
-public abstract class CancelableStreamObserver<T> implements StreamObserver<T>
{
-
- private CancellationContext cancellationContext;
-
- public void setCancellationContext(CancellationContext
cancellationContext) {
- this.cancellationContext = cancellationContext;
- }
+/**
+ * Used in ManyToOne and ManyToMany in server. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for requestStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to
responseStream.
+ */
+public class ServerTripleReactorPublisher<T> extends
AbstractTripleReactorPublisher<T> {
- public void cancel(Throwable throwable) {
- cancellationContext.cancel(throwable);
+ public ServerTripleReactorPublisher(CallStreamObserver<?>
callStreamObserver) {
+ super.onSubscribe(callStreamObserver);
}
-
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorSubscriber.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorSubscriber.java
new file mode 100644
index 0000000000..32453ac372
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorSubscriber.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+/**
+ * The Subscriber in server to passing the data produced by user publisher to
responseStream.
+ */
+public class ServerTripleReactorSubscriber<T> extends
AbstractTripleReactorSubscriber<T>{
+
+ @Override
+ public void subscribe(CallStreamObserver<T> downstream) {
+ super.subscribe(downstream);
+ if (downstream instanceof CancelableStreamObserver) {
+ CancelableStreamObserver<?> observer =
(CancelableStreamObserver<?>) downstream;
+ final CancellationContext context;
+ if (observer.getCancellationContext() == null) {
+ context = new CancellationContext();
+ observer.setCancellationContext(context);
+ } else {
+ context = observer.getCancellationContext();
+ }
+ context.addListener(ctx -> super.cancel());
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorClientCalls.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorClientCalls.java
new file mode 100644
index 0000000000..4ae6d45755
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorClientCalls.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.ClientTripleReactorPublisher;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.ClientTripleReactorSubscriber;
+import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * A collection of methods to convert client-side Reactor calls to stream
calls.
+ */
+public final class ReactorClientCalls {
+
+ private ReactorClientCalls() {
+ }
+
+ /**
+ * Implements a unary -> unary call as Mono -> Mono
+ *
+ * @param invoker invoker
+ * @param monoRequest the mono with request
+ * @param methodDescriptor the method descriptor
+ * @return the mono with response
+ */
+ public static <TRequest, TResponse, TInvoker> Mono<TResponse>
oneToOne(Invoker<TInvoker> invoker,
+
Mono<TRequest> monoRequest,
+
StubMethodDescriptor methodDescriptor) {
+ try {
+ return Mono.create(emitter -> monoRequest.subscribe(
+ request -> StubInvocationUtil.unaryCall(invoker,
methodDescriptor, request, new StreamObserver<TResponse>() {
+ @Override
+ public void onNext(TResponse tResponse) {
+ emitter.success(tResponse);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ emitter.error(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Do nothing
+ }
+ }),
+ emitter::error
+ ));
+ } catch (Throwable throwable) {
+ return Mono.error(throwable);
+ }
+ }
+
+ /**
+ * Implements a unary -> stream call as Mono -> Flux
+ *
+ * @param invoker invoker
+ * @param monoRequest the mono with request
+ * @param methodDescriptor the method descriptor
+ * @return the flux with response
+ */
+ public static <TRequest, TResponse, TInvoker> Flux<TResponse>
oneToMany(Invoker<TInvoker> invoker,
+
Mono<TRequest> monoRequest,
+
StubMethodDescriptor methodDescriptor) {
+ try {
+ return monoRequest
+ .flatMapMany(request -> {
+ ClientTripleReactorPublisher<TResponse> clientPublisher =
new ClientTripleReactorPublisher<>();
+ StubInvocationUtil.serverStreamCall(invoker,
methodDescriptor, request, clientPublisher);
+ return clientPublisher;
+ });
+ } catch (Throwable throwable) {
+ return Flux.error(throwable);
+ }
+ }
+
+ /**
+ * Implements a stream -> unary call as Flux -> Mono
+ *
+ * @param invoker invoker
+ * @param requestFlux the flux with request
+ * @param methodDescriptor the method descriptor
+ * @return the mono with response
+ */
+ public static <TRequest, TResponse, TInvoker> Mono<TResponse>
manyToOne(Invoker<TInvoker> invoker,
+
Flux<TRequest> requestFlux,
+
StubMethodDescriptor methodDescriptor) {
+ try {
+ ClientTripleReactorSubscriber<TRequest> clientSubscriber =
requestFlux.subscribeWith(new ClientTripleReactorSubscriber<>());
+ ClientTripleReactorPublisher<TResponse> clientPublisher = new
ClientTripleReactorPublisher<>(
+ s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>)
s),
+ clientSubscriber::cancel);
+ return Mono.from(clientPublisher).doOnSubscribe(dummy ->
+ StubInvocationUtil.biOrClientStreamCall(invoker,
methodDescriptor, clientPublisher));
+ } catch (Throwable throwable) {
+ return Mono.error(throwable);
+ }
+ }
+
+ /**
+ * Implements a stream -> stream call as Flux -> Flux
+ *
+ * @param invoker invoker
+ * @param requestFlux the flux with request
+ * @param methodDescriptor the method descriptor
+ * @return the flux with response
+ */
+ public static <TRequest, TResponse, TInvoker> Flux<TResponse>
manyToMany(Invoker<TInvoker> invoker,
+
Flux<TRequest> requestFlux,
+
StubMethodDescriptor methodDescriptor) {
+ try {
+ ClientTripleReactorSubscriber<TRequest> clientSubscriber =
requestFlux.subscribeWith(new ClientTripleReactorSubscriber<>());
+ ClientTripleReactorPublisher<TResponse> clientPublisher = new
ClientTripleReactorPublisher<>(
+ s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>)
s),
+ clientSubscriber::cancel);
+ return Flux.from(clientPublisher).doOnSubscribe(dummy ->
+ StubInvocationUtil.biOrClientStreamCall(invoker,
methodDescriptor, clientPublisher));
+ } catch (Throwable throwable) {
+ return Flux.error(throwable);
+ }
+ }
+
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorServerCalls.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorServerCalls.java
new file mode 100644
index 0000000000..6454d26ba3
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorServerCalls.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import org.apache.dubbo.rpc.protocol.tri.reactive.ServerTripleReactorPublisher;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.ServerTripleReactorSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A collection of methods to convert server-side stream calls to Reactor
calls.
+ */
+public final class ReactorServerCalls {
+
+ private ReactorServerCalls() {
+ }
+
+ /**
+ * Implements a unary -> unary call as Mono -> Mono
+ *
+ * @param request request
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ */
+ public static <T, R> void oneToOne(T request,
+ StreamObserver<R> responseObserver,
+ Function<Mono<T>, Mono<R>> func) {
+ func.apply(Mono.just(request)).subscribe(res -> {
+ CompletableFuture.completedFuture(res)
+ .whenComplete((r, t) -> {
+ if (t != null) {
+ responseObserver.onError(t);
+ } else {
+ responseObserver.onNext(r);
+ responseObserver.onCompleted();
+ }
+ });
+ });
+ }
+
+ /**
+ * Implements a unary -> stream call as Mono -> Flux
+ *
+ * @param request request
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ */
+ public static <T, R> void oneToMany(T request,
+ StreamObserver<R> responseObserver,
+ Function<Mono<T>, Flux<R>> func) {
+ try {
+ Flux<R> response = func.apply(Mono.just(request));
+ ServerTripleReactorSubscriber<R> subscriber =
response.subscribeWith(new ServerTripleReactorSubscriber<>());
+ subscriber.subscribe((ServerCallToObserverAdapter<R>)
responseObserver);
+ } catch (Throwable throwable) {
+ responseObserver.onError(throwable);
+ }
+ }
+
+ /**
+ * Implements a stream -> unary call as Flux -> Mono
+ *
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ * @return request StreamObserver
+ */
+ public static <T, R> StreamObserver<T> manyToOne(StreamObserver<R>
responseObserver,
+ Function<Flux<T>,
Mono<R>> func) {
+ ServerTripleReactorPublisher<T> serverPublisher = new
ServerTripleReactorPublisher<T>((CallStreamObserver<R>) responseObserver);
+ try {
+ Mono<R> responseMono = func.apply(Flux.from(serverPublisher));
+ responseMono.subscribe(value -> {
+ // Don't try to respond if the server has already canceled
the request
+ if (!serverPublisher.isCancelled()) {
+ responseObserver.onNext(value);
+ }
+ },
+ throwable -> {
+ // Don't try to respond if the server has already canceled
the request
+ if (!serverPublisher.isCancelled()) {
+ responseObserver.onError(throwable);
+ }
+ },
+ responseObserver::onCompleted
+ );
+ serverPublisher.startRequest();
+ } catch (Throwable throwable) {
+ responseObserver.onError(throwable);
+ }
+ return serverPublisher;
+ }
+
+ /**
+ * Implements a stream -> stream call as Flux -> Flux
+ *
+ * @param responseObserver response StreamObserver
+ * @param func service implementation
+ * @return request StreamObserver
+ */
+ public static <T, R> StreamObserver<T> manyToMany(StreamObserver<R>
responseObserver,
+ Function<Flux<T>,
Flux<R>> func) {
+ // responseObserver is also a subscription of publisher, we can use it
to request more data
+ ServerTripleReactorPublisher<T> serverPublisher = new
ServerTripleReactorPublisher<T>((CallStreamObserver<R>) responseObserver);
+ try {
+ Flux<R> responseFlux = func.apply(Flux.from(serverPublisher));
+ ServerTripleReactorSubscriber<R> serverSubscriber =
responseFlux.subscribeWith(new ServerTripleReactorSubscriber<>());
+ serverSubscriber.subscribe((CallStreamObserver<R>)
responseObserver);
+ serverPublisher.startRequest();
+ } catch (Throwable throwable) {
+ responseObserver.onError(throwable);
+ }
+
+ return serverPublisher;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToManyMethodHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToManyMethodHandler.java
new file mode 100644
index 0000000000..df583f6711
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToManyMethodHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Flux;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of ManyToMany() method for stub invocation.
+ */
+public class ManyToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Flux<T>, Flux<R>> func;
+
+ public ManyToManyMethodHandler(Function<Flux<T>, Flux<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+ CallStreamObserver<R> responseObserver = (CallStreamObserver<R>)
arguments[0];
+ StreamObserver<T> requestObserver =
ReactorServerCalls.manyToMany(responseObserver, func);
+ return CompletableFuture.completedFuture(requestObserver);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToOneMethodHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToOneMethodHandler.java
new file mode 100644
index 0000000000..133f92a29b
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToOneMethodHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of ManyToOne() method for stub invocation.
+ */
+public class ManyToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Flux<T>, Mono<R>> func;
+
+ public ManyToOneMethodHandler(Function<Flux<T>, Mono<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+ CallStreamObserver<R> responseObserver = (CallStreamObserver<R>)
arguments[0];
+ StreamObserver<T> requestObserver =
ReactorServerCalls.manyToOne(responseObserver, func);
+ return CompletableFuture.completedFuture(requestObserver);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToManyMethodHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToManyMethodHandler.java
new file mode 100644
index 0000000000..ef94d6d80e
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToManyMethodHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of OneToMany() method for stub invocation.
+ */
+public class OneToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Mono<T>, Flux<R>> func;
+
+ public OneToManyMethodHandler(Function<Mono<T>, Flux<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<?> invoke(Object[] arguments) {
+ T request = (T) arguments[0];
+ StreamObserver<R> responseObserver = (StreamObserver<R>) arguments[1];
+ ReactorServerCalls.oneToMany(request, responseObserver, func);
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToOneMethodHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToOneMethodHandler.java
new file mode 100644
index 0000000000..0a8b0a7191
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToOneMethodHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.FutureToObserverAdaptor;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of OneToOne() method for stub invocation.
+ */
+public class OneToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+ private final Function<Mono<T>, Mono<R>> func;
+
+ public OneToOneMethodHandler(Function<Mono<T>, Mono<R>> func) {
+ this.func = func;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture<R> invoke(Object[] arguments) {
+ T request = (T) arguments[0];
+ CompletableFuture<R> future = new CompletableFuture<>();
+ StreamObserver<R> responseObserver = new
FutureToObserverAdaptor<>(future);
+ ReactorServerCalls.oneToOne(request, responseObserver, func);
+ return future;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToManyMethodHandlerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToManyMethodHandlerTest.java
new file mode 100644
index 0000000000..6669eaa7d9
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToManyMethodHandlerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToManyMethodHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Unit test for ManyToManyMethodHandler
+ */
+public final class ManyToManyMethodHandlerTest {
+
+ @Test
+ public void testInvoke() throws ExecutionException, InterruptedException {
+ AtomicInteger nextCounter = new AtomicInteger();
+ AtomicInteger completeCounter = new AtomicInteger();
+ AtomicInteger errorCounter = new AtomicInteger();
+ ServerCallToObserverAdapter<String> responseObserver =
Mockito.mock(ServerCallToObserverAdapter.class);
+ doAnswer(o -> nextCounter.incrementAndGet())
+ .when(responseObserver).onNext(anyString());
+ doAnswer(o -> completeCounter.incrementAndGet())
+ .when(responseObserver).onCompleted();
+ doAnswer(o -> errorCounter.incrementAndGet())
+ .when(responseObserver).onError(any(Throwable.class));
+ ManyToManyMethodHandler<String, String> handler = new
ManyToManyMethodHandler<>(requestFlux ->
+ requestFlux.map(r -> r + "0"));
+ CompletableFuture<StreamObserver<String>> future = handler.invoke(new
Object[]{responseObserver});
+ StreamObserver<String> requestObserver = future.get();
+ for (int i = 0; i < 10; i++) {
+ requestObserver.onNext(String.valueOf(i));
+ }
+ requestObserver.onCompleted();
+ Assertions.assertEquals(10, nextCounter.get());
+ Assertions.assertEquals(0, errorCounter.get());
+ Assertions.assertEquals(1, completeCounter.get());
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToOneMethodHandlerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToOneMethodHandlerTest.java
new file mode 100644
index 0000000000..6cdde8d251
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToOneMethodHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToOneMethodHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Unit test for ManyToOneMethodHandler
+ */
+public final class ManyToOneMethodHandlerTest {
+
+ @Test
+ public void testInvoker() throws ExecutionException, InterruptedException {
+ AtomicInteger nextCounter = new AtomicInteger();
+ AtomicInteger completeCounter = new AtomicInteger();
+ AtomicInteger errorCounter = new AtomicInteger();
+ ServerCallToObserverAdapter<String> responseObserver =
Mockito.mock(ServerCallToObserverAdapter.class);
+ doAnswer(o -> nextCounter.incrementAndGet())
+ .when(responseObserver).onNext(anyString());
+ doAnswer(o -> completeCounter.incrementAndGet())
+ .when(responseObserver).onCompleted();
+ doAnswer(o -> errorCounter.incrementAndGet())
+ .when(responseObserver).onError(any(Throwable.class));
+ ManyToOneMethodHandler<String, String> handler = new
ManyToOneMethodHandler<>(requestFlux ->
+
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
+ CompletableFuture<StreamObserver<String>> future = handler.invoke(new
Object[]{responseObserver});
+ StreamObserver<String> requestObserver = future.get();
+ for (int i = 0; i < 10; i++) {
+ requestObserver.onNext(String.valueOf(i));
+ }
+ requestObserver.onCompleted();
+ Assertions.assertEquals(1, nextCounter.get());
+ Assertions.assertEquals(0, errorCounter.get());
+ Assertions.assertEquals(1, completeCounter.get());
+ }
+
+ @Test
+ public void testError() throws ExecutionException, InterruptedException {
+ AtomicInteger nextCounter = new AtomicInteger();
+ AtomicInteger completeCounter = new AtomicInteger();
+ AtomicInteger errorCounter = new AtomicInteger();
+ ServerCallToObserverAdapter<String> responseObserver =
Mockito.mock(ServerCallToObserverAdapter.class);
+ doAnswer(o -> nextCounter.incrementAndGet())
+ .when(responseObserver).onNext(anyString());
+ doAnswer(o -> completeCounter.incrementAndGet())
+ .when(responseObserver).onCompleted();
+ doAnswer(o -> errorCounter.incrementAndGet())
+ .when(responseObserver).onError(any(Throwable.class));
+ ManyToOneMethodHandler<String, String> handler = new
ManyToOneMethodHandler<>(requestFlux ->
+
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
+ CompletableFuture<StreamObserver<String>> future = handler.invoke(new
Object[]{responseObserver});
+ StreamObserver<String> requestObserver = future.get();
+ for (int i = 0; i < 10; i++) {
+ if (i == 6) {
+ requestObserver.onError(new Throwable());
+ }
+ requestObserver.onNext(String.valueOf(i));
+ }
+ requestObserver.onCompleted();
+ Assertions.assertEquals(0, nextCounter.get());
+ Assertions.assertEquals(1, errorCounter.get());
+ Assertions.assertEquals(0, completeCounter.get());
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToManyMethodHandlerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToManyMethodHandlerTest.java
new file mode 100644
index 0000000000..42b64536f1
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToManyMethodHandlerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToManyMethodHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import reactor.core.publisher.Flux;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Unit test for OneToManyMethodHandler
+ */
+public final class OneToManyMethodHandlerTest {
+
+ @Test
+ public void testInvoke() {
+ String request = "1,2,3,4,5,6,7";
+ AtomicInteger nextCounter = new AtomicInteger();
+ AtomicInteger completeCounter = new AtomicInteger();
+ AtomicInteger errorCounter = new AtomicInteger();
+ ServerCallToObserverAdapter<String> responseObserver =
Mockito.mock(ServerCallToObserverAdapter.class);
+ doAnswer(o -> nextCounter.incrementAndGet())
+ .when(responseObserver).onNext(anyString());
+ doAnswer(o -> completeCounter.incrementAndGet())
+ .when(responseObserver).onCompleted();
+ doAnswer(o -> errorCounter.incrementAndGet())
+ .when(responseObserver).onError(any(Throwable.class));
+ OneToManyMethodHandler<String, String> handler = new
OneToManyMethodHandler<>(requestMono ->
+ requestMono.flatMapMany(r -> Flux.fromArray(r.split(","))));
+ CompletableFuture<?> future = handler.invoke(new Object[]{request,
responseObserver});
+ Assertions.assertTrue(future.isDone());
+ Assertions.assertEquals(7, nextCounter.get());
+ Assertions.assertEquals(0, errorCounter.get());
+ Assertions.assertEquals(1, completeCounter.get());
+ }
+
+ @Test
+ public void testError() {
+ String request = "1,2,3,4,5,6,7";
+ AtomicInteger nextCounter = new AtomicInteger();
+ AtomicInteger completeCounter = new AtomicInteger();
+ AtomicInteger errorCounter = new AtomicInteger();
+ ServerCallToObserverAdapter<String> responseObserver =
Mockito.mock(ServerCallToObserverAdapter.class);
+ doAnswer(o -> nextCounter.incrementAndGet())
+ .when(responseObserver).onNext(anyString());
+ doAnswer(o -> completeCounter.incrementAndGet())
+ .when(responseObserver).onCompleted();
+ doAnswer(o -> errorCounter.incrementAndGet())
+ .when(responseObserver).onError(any(Throwable.class));
+ OneToManyMethodHandler<String, String> handler = new
OneToManyMethodHandler<>(requestMono ->
+ Flux.create(emitter -> {
+ for (int i = 0; i < 10; i++) {
+ if (i == 6) {
+ emitter.error(new Throwable());
+ } else {
+ emitter.next(String.valueOf(i));
+ }
+ }
+ }));
+ CompletableFuture<?> future = handler.invoke(new Object[]{request,
responseObserver});
+ Assertions.assertTrue(future.isDone());
+ Assertions.assertEquals(6, nextCounter.get());
+ Assertions.assertEquals(1, errorCounter.get());
+ Assertions.assertEquals(0, completeCounter.get());
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToOneMethodHandlerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToOneMethodHandlerTest.java
new file mode 100644
index 0000000000..5ba898ca8d
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToOneMethodHandlerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToOneMethodHandler;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit test for OneToOneMethodHandler
+ */
+public final class OneToOneMethodHandlerTest {
+
+ @Test
+ public void testInvoke() throws ExecutionException, InterruptedException {
+ String request = "request";
+ OneToOneMethodHandler<String, String> handler = new
OneToOneMethodHandler<>(requestMono ->
+ requestMono.map(r -> r + "Test"));
+ CompletableFuture<?> future = handler.invoke(new Object[]{request});
+ assertEquals("requestTest", future.get());
+ }
+}