This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new e069a7af9e Support dubbo reactive stream with project reactor (#10290)
e069a7af9e is described below

commit e069a7af9e2d5b920b63845d5228b6b3deca1a8c
Author: Kunshuai Zhu <[email protected]>
AuthorDate: Mon Aug 15 10:44:48 2022 +0800

    Support dubbo reactive stream with project reactor (#10290)
    
    * Support dubbo reactive stream with project reactor
    
    * basically successfully connected
    
    * Solve NPE in first calling CallStreamObserver#request(int)
    
    * Polish code, add comment and license
    
    * Optimize dependency configuration
    
    * Add manyToMany and oneToOne
    
    * Add manyToOne
    
    * Add ReactorDubbo3TripleGenerator
    
    * remove unnecessary commits
    
    * Add lisence header
    
    * Add comment
    
    * remove unused import
    
    * fix npe in TripleInvoker
    
    * Optimize manyToOne
    
    * Polish
    
    * polish
    
    * Fix reduce opration
    
    * Add unit test
    
    * Refactor TripleInvoker#invokeUnary to use UnaryClientCallListener
    
    * merge CancelableStreamObserver and ClientResponseObserver
    
    * discard repeated subscribe silently
    
    * remove AtomicReferenceFieldUpdater
    
    * remove unused volatile
    
    * polish
    
    * polish
    
    * move SafeRequestStreamObserver to CancelableStreamObserver
    
    * remove SafeRequestObserver
    
    * set onStartConsumer empty defultly
    
    * Fix unexpected invoke before subscribe in oneToMany & manyToOne
---
 .../tri/reactive/ReactorDubbo3TripleGenerator.java |  62 +++++++
 .../ReactorDubbo3TripleInterfaceStub.mustache      |  41 +++++
 .../resources/ReactorDubbo3TripleStub.mustache     | 180 +++++++++++++++++++++
 dubbo-rpc/dubbo-rpc-triple/pom.xml                 |  21 ++-
 .../rpc/protocol/tri/CancelableStreamObserver.java |  12 ++
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |  14 +-
 .../call/ObserverToClientCallListenerAdapter.java  |   8 +
 .../tri/observer/ServerCallToObserverAdapter.java  |   2 +-
 .../reactive/AbstractTripleReactorPublisher.java   | 169 +++++++++++++++++++
 .../reactive/AbstractTripleReactorSubscriber.java  | 106 ++++++++++++
 .../tri/reactive/ClientTripleReactorPublisher.java |  44 +++++
 .../ClientTripleReactorSubscriber.java}            |  25 ++-
 .../ServerTripleReactorPublisher.java}             |  24 ++-
 .../reactive/ServerTripleReactorSubscriber.java    |  44 +++++
 .../tri/reactive/calls/ReactorClientCalls.java     | 143 ++++++++++++++++
 .../tri/reactive/calls/ReactorServerCalls.java     | 136 ++++++++++++++++
 .../reactive/handler/ManyToManyMethodHandler.java  |  47 ++++++
 .../reactive/handler/ManyToOneMethodHandler.java   |  48 ++++++
 .../reactive/handler/OneToManyMethodHandler.java   |  48 ++++++
 .../reactive/handler/OneToOneMethodHandler.java    |  49 ++++++
 .../tri/reactive/ManyToManyMethodHandlerTest.java  |  64 ++++++++
 .../tri/reactive/ManyToOneMethodHandlerTest.java   |  92 +++++++++++
 .../tri/reactive/OneToManyMethodHandlerTest.java   |  90 +++++++++++
 .../tri/reactive/OneToOneMethodHandlerTest.java    |  41 +++++
 24 files changed, 1473 insertions(+), 37 deletions(-)

diff --git 
a/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/reactive/ReactorDubbo3TripleGenerator.java
 
b/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/reactive/ReactorDubbo3TripleGenerator.java
new file mode 100644
index 0000000000..dfc8d86e61
--- /dev/null
+++ 
b/dubbo-compiler/src/main/java/org/apache/dubbo/gen/tri/reactive/ReactorDubbo3TripleGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.gen.tri.reactive;
+
+import com.salesforce.jprotoc.ProtocPlugin;
+import org.apache.dubbo.gen.AbstractGenerator;
+
+public class ReactorDubbo3TripleGenerator extends AbstractGenerator {
+
+    public static void main(String[] args) {
+        if (args.length == 0) {
+            ProtocPlugin.generate(new ReactorDubbo3TripleGenerator());
+        } else {
+            ProtocPlugin.debug(new ReactorDubbo3TripleGenerator(), args[0]);
+        }
+    }
+
+    @Override
+    protected String getClassPrefix() {
+        return "Dubbo";
+    }
+
+    @Override
+    protected String getClassSuffix() {
+        return "Triple";
+    }
+
+    @Override
+    protected String getTemplateFileName() {
+        return "ReactorDubbo3TripleStub.mustache";
+    }
+
+    @Override
+    protected String getInterfaceTemplateFileName() {
+        return "ReactorDubbo3TripleInterfaceStub.mustache";
+    }
+
+    @Override
+    protected String getSingleTemplateFileName() {
+        throw new IllegalStateException("Do not support single template!");
+    }
+
+    @Override
+    protected boolean enableMultipleTemplateFiles() {
+        return true;
+    }
+}
diff --git 
a/dubbo-compiler/src/main/resources/ReactorDubbo3TripleInterfaceStub.mustache 
b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleInterfaceStub.mustache
new file mode 100644
index 0000000000..b3b900831e
--- /dev/null
+++ 
b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleInterfaceStub.mustache
@@ -0,0 +1,41 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface {{interfaceClassName}} {
+
+    String JAVA_SERVICE_NAME = "{{packageName}}.{{serviceName}}";
+
+    String SERVICE_NAME = "{{commonPackageName}}.{{serviceName}}";
+
+{{#methods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    {{#deprecated}}
+        @java.lang.Deprecated
+    {{/deprecated}}
+    
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
 
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
 reactorRequest) ;
+
+{{/methods}}
+}
diff --git a/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache 
b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
new file mode 100644
index 0000000000..43b715e697
--- /dev/null
+++ b/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
@@ -0,0 +1,180 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+{{#packageName}}
+package {{packageName}};
+{{/packageName}}
+
+import com.google.protobuf.Message;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.PathResolver;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.ServerService;
+import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.model.StubServiceDescriptor;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToManyMethodHandler;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToOneMethodHandler;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToManyMethodHandler;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorClientCalls;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToOneMethodHandler;
+
+import org.apache.dubbo.rpc.stub.StubInvoker;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import org.apache.dubbo.rpc.stub.StubSuppliers;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class {{className}} {
+
+    private {{className}}() {}
+
+    public static final String SERVICE_NAME = 
{{interfaceClassName}}.SERVICE_NAME;
+
+    private static final StubServiceDescriptor serviceDescriptor = new 
StubServiceDescriptor(SERVICE_NAME,{{interfaceClassName}}.class);
+
+    static {
+        StubSuppliers.addSupplier(SERVICE_NAME, {{className}}::newStub);
+        StubSuppliers.addSupplier({{interfaceClassName}}.JAVA_SERVICE_NAME,  
{{className}}::newStub);
+        StubSuppliers.addDescriptor(SERVICE_NAME, serviceDescriptor);
+        StubSuppliers.addDescriptor({{interfaceClassName}}.JAVA_SERVICE_NAME, 
serviceDescriptor);
+    }
+
+    @SuppressWarnings("all")
+    public static {{interfaceClassName}} newStub(Invoker<?> invoker) {
+        return new 
{{interfaceClassName}}Stub((Invoker<{{interfaceClassName}}>)invoker);
+    }
+
+{{#unaryMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, serviceDescriptor, 
MethodDescriptor.RpcType.UNARY,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/unaryMethods}}
+
+{{#serverStreamingMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, serviceDescriptor, 
MethodDescriptor.RpcType.SERVER_STREAM,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/serverStreamingMethods}}
+
+{{#clientStreamingMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, serviceDescriptor, 
MethodDescriptor.RpcType.CLIENT_STREAM,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/clientStreamingMethods}}
+
+{{#biStreamingWithoutClientStreamMethods}}
+    {{#javaDoc}}
+        {{{javaDoc}}}
+    {{/javaDoc}}
+    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
+        {{inputType}}.class, {{outputType}}.class, serviceDescriptor, 
MethodDescriptor.RpcType.BI_STREAM,
+        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
+        {{outputType}}::parseFrom);
+{{/biStreamingWithoutClientStreamMethods}}
+
+    public static class {{interfaceClassName}}Stub implements 
{{interfaceClassName}}{
+
+        private final Invoker<{{interfaceClassName}}> invoker;
+
+        public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> 
invoker) {
+            this.invoker = invoker;
+        }
+
+    {{#methods}}
+        {{#javaDoc}}
+            {{{javaDoc}}}
+        {{/javaDoc}}
+        {{#deprecated}}
+            @java.lang.Deprecated
+        {{/deprecated}}
+        public 
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
 
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
 request) {
+            return ReactorClientCalls.{{reactiveCallsMethodName}}(invoker, 
request, {{methodNameCamelCase}}Method);
+        }
+    {{/methods}}
+    }
+
+    public static abstract class {{interfaceClassName}}ImplBase implements 
{{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
+
+        @Override
+        public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
+            PathResolver pathResolver = url.getOrDefaultFrameworkModel()
+            .getExtensionLoader(PathResolver.class)
+            .getDefaultExtension();
+            Map<String,StubMethodHandler<?, ?>> handlers = new HashMap<>();
+
+            {{#methods}}
+                pathResolver.addNativeStub( "/" + SERVICE_NAME + 
"/{{originMethodName}}" );
+            {{/methods}}
+
+            {{#unaryMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new 
OneToOneMethodHandler<>(this::{{methodName}}));
+            {{/unaryMethods}}
+            {{#serverStreamingMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new 
OneToManyMethodHandler<>(this::{{methodName}}));
+            {{/serverStreamingMethods}}
+            {{#clientStreamingMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new 
ManyToOneMethodHandler<>(this::{{methodName}}));
+            {{/clientStreamingMethods}}
+            {{#biStreamingWithoutClientStreamMethods}}
+                handlers.put({{methodName}}Method.getMethodName(), new 
ManyToManyMethodHandler<>(this::{{methodName}}));
+            {{/biStreamingWithoutClientStreamMethods}}
+
+            return new StubInvoker<>(this, url, {{interfaceClassName}}.class, 
handlers);
+        }
+
+    {{#methods}}
+        {{#javaDoc}}
+            {{{javaDoc}}}
+        {{/javaDoc}}
+        {{#deprecated}}
+            @java.lang.Deprecated
+        {{/deprecated}}
+        public 
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
 
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
 request) {
+            throw unimplementedMethodException({{methodName}}Method);
+        }
+    {{/methods}}
+
+        @Override
+        public final ServiceDescriptor getServiceDescriptor() {
+            return serviceDescriptor;
+        }
+
+        private RpcException unimplementedMethodException(StubMethodDescriptor 
methodDescriptor) {
+            return 
TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Method %s is 
unimplemented",
+            "/" + serviceDescriptor.getInterfaceName() + "/" + 
methodDescriptor.getMethodName())).asException();
+        }
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/pom.xml 
b/dubbo-rpc/dubbo-rpc-triple/pom.xml
index 9ff3afcbae..81e46d1cda 100644
--- a/dubbo-rpc/dubbo-rpc-triple/pom.xml
+++ b/dubbo-rpc/dubbo-rpc-triple/pom.xml
@@ -29,8 +29,23 @@
     <description>The triple protocol module</description>
     <properties>
         <skip_maven_deploy>false</skip_maven_deploy>
+        <dubbo.compiler.version>0.0.4.1-SNAPSHOT</dubbo.compiler.version>
+        <reactive.version>1.0.4</reactive.version>
+        <reactor.version>3.4.19</reactor.version>
     </properties>
     <dependencies>
+        <dependency>
+            <groupId>org.reactivestreams</groupId>
+            <artifactId>reactive-streams</artifactId>
+            <version>${reactive.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <version>${reactor.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-rpc-api</artifactId>
@@ -63,12 +78,6 @@
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-core</artifactId>
-            <version>3.4.16</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-compress</artifactId>
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
index 3a059bc5fb..76ae697ed4 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
 
 public abstract class CancelableStreamObserver<T> implements StreamObserver<T> 
{
 
@@ -28,8 +29,19 @@ public abstract class CancelableStreamObserver<T> implements 
StreamObserver<T> {
         this.cancellationContext = cancellationContext;
     }
 
+    public CancellationContext getCancellationContext() {
+        return cancellationContext;
+    }
+
     public void cancel(Throwable throwable) {
         cancellationContext.cancel(throwable);
     }
 
+    public void beforeStart(final ClientCallToObserverAdapter<T> 
clientCallToObserverAdapter) {
+        // do nothing
+    }
+
+    public void startRequest() {
+        // do nothing
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index ce2ee44066..e358efffca 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -46,6 +46,7 @@ import 
org.apache.dubbo.rpc.protocol.tri.call.ObserverToClientCallListenerAdapte
 import org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
 import org.apache.dubbo.rpc.protocol.tri.call.UnaryClientCallListener;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
 import org.apache.dubbo.rpc.support.RpcUtils;
 
 import io.netty.util.AsciiString;
@@ -174,14 +175,18 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
     StreamObserver<Object> streamCall(ClientCall call,
         RequestMetadata metadata,
         StreamObserver<Object> responseObserver) {
+        ObserverToClientCallListenerAdapter listener = new 
ObserverToClientCallListenerAdapter(
+            responseObserver);
+        StreamObserver<Object> streamObserver = call.start(metadata, listener);
         if (responseObserver instanceof CancelableStreamObserver) {
             final CancellationContext context = new CancellationContext();
-            ((CancelableStreamObserver<Object>) 
responseObserver).setCancellationContext(context);
+            CancelableStreamObserver<Object> cancelableStreamObserver = 
(CancelableStreamObserver<Object>) responseObserver;
+            cancelableStreamObserver.setCancellationContext(context);
             context.addListener(context1 -> call.cancelByLocal(new 
IllegalStateException("Canceled by app")));
+            listener.setOnStartConsumer(dummy -> 
cancelableStreamObserver.startRequest());
+            
cancelableStreamObserver.beforeStart((ClientCallToObserverAdapter<Object>) 
streamObserver);
         }
-        ObserverToClientCallListenerAdapter listener = new 
ObserverToClientCallListenerAdapter(
-            responseObserver);
-        return call.start(metadata, listener);
+        return streamObserver;
     }
 
     AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation 
invocation,
@@ -204,6 +209,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         }
         result = new AsyncRpcResult(future, invocation);
         FutureContext.getContext().setCompatibleFuture(future);
+
         result.setExecutor(callbackExecutor);
         ClientCall.Listener callListener = new UnaryClientCallListener(future);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 5fac300f6f..1c934ce970 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -21,16 +21,22 @@ import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.TriRpcStatus;
 
 import java.util.Map;
+import java.util.function.Consumer;
 
 public class ObserverToClientCallListenerAdapter implements 
ClientCall.Listener {
 
     private final StreamObserver<Object> delegate;
     private ClientCall call;
+    private Consumer<ClientCall> onStartConsumer = clientCall -> { };
 
     public ObserverToClientCallListenerAdapter(StreamObserver<Object> 
delegate) {
         this.delegate = delegate;
     }
 
+    public void setOnStartConsumer(Consumer<ClientCall> onStartConsumer) {
+        this.onStartConsumer = onStartConsumer;
+    }
+
     @Override
     public void onMessage(Object message) {
         delegate.onNext(message);
@@ -54,5 +60,7 @@ public class ObserverToClientCallListenerAdapter implements 
ClientCall.Listener
         if (call.isAutoRequest()) {
             call.request(1);
         }
+
+        onStartConsumer.accept(call);
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
index edcd555a21..18bb578f01 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ServerCallToObserverAdapter.java
@@ -47,7 +47,7 @@ public class ServerCallToObserverAdapter<T> extends 
CancelableStreamObserver<T>
     }
 
 
-    private boolean isTerminated() {
+    public boolean isTerminated() {
         return terminated;
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorPublisher.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorPublisher.java
new file mode 100644
index 0000000000..9eab281828
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorPublisher.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * The middle layer between {@link 
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive 
API. <p>
+ * 1. passing the data received by CallStreamObserver to Reactive consumer <br>
+ * 2. passing the request of Reactive API to CallStreamObserver
+ */
+public abstract class AbstractTripleReactorPublisher<T> extends 
CancelableStreamObserver<T> implements Publisher<T>, Subscription {
+
+    private boolean canRequest;
+
+    private long requested;
+
+    // weather publisher has been subscribed
+    private final AtomicBoolean SUBSCRIBED = new AtomicBoolean();
+
+    private volatile Subscriber<? super T> downstream;
+
+    protected volatile CallStreamObserver<?> subscription;
+
+    private final AtomicBoolean HAS_SUBSCRIPTION = new AtomicBoolean();
+
+    // cancel status
+    private volatile boolean isCancelled;
+
+    // complete status
+    private volatile boolean isDone;
+
+    // to help bind TripleSubscriber
+    private volatile Consumer<CallStreamObserver<?>> onSubscribe;
+
+    private volatile Runnable shutdownHook;
+
+    private final AtomicBoolean CALLED_SHUT_DOWN_HOOK = new AtomicBoolean();
+
+    public AbstractTripleReactorPublisher() {
+    }
+
+    public AbstractTripleReactorPublisher(Consumer<CallStreamObserver<?>> 
onSubscribe, Runnable shutdownHook) {
+        this.onSubscribe = onSubscribe;
+        this.shutdownHook = shutdownHook;
+    }
+
+    protected void onSubscribe(final CallStreamObserver<?> subscription) {
+        if (subscription != null && this.subscription == null && 
HAS_SUBSCRIPTION.compareAndSet(false, true)) {
+            this.subscription = subscription;
+            subscription.disableAutoFlowControl();
+            if (onSubscribe != null) {
+                onSubscribe.accept(subscription);
+            }
+            return;
+        }
+
+        throw new IllegalStateException(getClass().getSimpleName() + " 
supports only a single subscription");
+    }
+
+    @Override
+    public void onNext(T data) {
+        if (isDone || isCancelled) {
+            return;
+        }
+        downstream.onNext(data);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        if (isDone || isCancelled) {
+            return;
+        }
+        isDone = true;
+        downstream.onError(throwable);
+        doPostShutdown();
+    }
+
+    @Override
+    public void onCompleted() {
+        if (isDone || isCancelled) {
+            return;
+        }
+        isDone = true;
+        downstream.onComplete();
+        doPostShutdown();
+    }
+
+    private void doPostShutdown() {
+        Runnable r = shutdownHook;
+        // CAS to confirm shutdownHook will be run only once.
+        if (r != null && CALLED_SHUT_DOWN_HOOK.compareAndSet(false, true)) {
+            shutdownHook = null;
+            r.run();
+        }
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        if (subscriber == null) {
+            throw new NullPointerException();
+        }
+
+        if (SUBSCRIBED.compareAndSet(false, true)) {
+            subscriber.onSubscribe(this);
+            this.downstream = subscriber;
+            if (isCancelled) {
+                this.downstream = null;
+            }
+        }
+    }
+
+    @Override
+    public void request(long l) {
+        synchronized (this) {
+            if (SUBSCRIBED.get() && canRequest) {
+                subscription.request(l >= Integer.MAX_VALUE ? 
Integer.MAX_VALUE : (int) l);
+            } else {
+                requested += l;
+            }
+        }
+    }
+
+    @Override
+    public void startRequest() {
+        synchronized (this) {
+            if (!canRequest) {
+                canRequest = true;
+                long count = requested;
+                subscription.request(count >= Integer.MAX_VALUE ? 
Integer.MAX_VALUE : (int) count);
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        if (isCancelled) {
+            return;
+        }
+        isCancelled = true;
+        doPostShutdown();
+    }
+
+    public boolean isCancelled() {
+        return isCancelled;
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorSubscriber.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorSubscriber.java
new file mode 100644
index 0000000000..40acec2e9b
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/AbstractTripleReactorSubscriber.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.util.annotation.NonNull;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The middle layer between {@link 
org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Reactive 
API. <br>
+ * Passing the data from Reactive producer to CallStreamObserver.
+ */
+public abstract class AbstractTripleReactorSubscriber<T> implements 
Subscriber<T>, CoreSubscriber<T> {
+
+    private volatile boolean isCancelled;
+
+    protected volatile CallStreamObserver<T> downstream;
+
+    private final AtomicBoolean SUBSCRIBED = new AtomicBoolean();
+
+    private volatile Subscription subscription;
+
+    private final AtomicBoolean HAS_SUBSCRIBED = new AtomicBoolean();
+
+    // complete status
+    private volatile boolean isDone;
+
+    /**
+     * Binding the downstream, and call subscription#request(1).
+     *
+     * @param downstream downstream
+     */
+    public void subscribe(final CallStreamObserver<T> downstream) {
+        if (downstream == null) {
+            throw new NullPointerException();
+        }
+        if (this.downstream == null && SUBSCRIBED.compareAndSet(false, true)) {
+            this.downstream = downstream;
+            subscription.request(1);
+        }
+    }
+
+    @Override
+    public void onSubscribe(@NonNull final Subscription subscription) {
+        if (this.subscription == null && HAS_SUBSCRIBED.compareAndSet(false, 
true)) {
+            this.subscription = subscription;
+            return;
+        }
+        // onSubscribe cannot be called repeatedly
+        subscription.cancel();
+    }
+
+    @Override
+    public void onNext(T t) {
+        if (!isDone && !isCanceled()) {
+            downstream.onNext(t);
+            subscription.request(1);
+        }
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        if (!isCanceled()) {
+            isDone = true;
+            downstream.onError(throwable);
+        }
+    }
+
+    @Override
+    public void onComplete() {
+        if (!isCanceled()) {
+            isDone = true;
+            downstream.onCompleted();
+        }
+    }
+
+    public void cancel() {
+        if (!isCancelled && subscription != null) {
+            isCancelled = true;
+            subscription.cancel();
+        }
+    }
+
+    public boolean isCanceled() {
+        return isCancelled;
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorPublisher.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorPublisher.java
new file mode 100644
index 0000000000..1deab5ac0b
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorPublisher.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+
+import java.util.function.Consumer;
+
+/**
+ * Used in OneToMany & ManyToOne & ManyToMany in client. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for responseStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to 
requestStream.
+ */
+public class ClientTripleReactorPublisher<T> extends 
AbstractTripleReactorPublisher<T> {
+
+    public ClientTripleReactorPublisher() {
+    }
+
+    public ClientTripleReactorPublisher(Consumer<CallStreamObserver<?>> 
onSubscribe, Runnable shutdownHook) {
+        super(onSubscribe, shutdownHook);
+    }
+
+    @Override
+    public void beforeStart(ClientCallToObserverAdapter<T> 
clientCallToObserverAdapter) {
+        super.onSubscribe(clientCallToObserverAdapter);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorSubscriber.java
similarity index 60%
copy from 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
copy to 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorSubscriber.java
index 3a059bc5fb..3d5936d61f 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ClientTripleReactorSubscriber.java
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.dubbo.rpc.protocol.tri;
+package org.apache.dubbo.rpc.protocol.tri.reactive;
 
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
 
-public abstract class CancelableStreamObserver<T> implements StreamObserver<T> 
{
-
-    private CancellationContext cancellationContext;
-
-    public void setCancellationContext(CancellationContext 
cancellationContext) {
-        this.cancellationContext = cancellationContext;
-    }
+/**
+ * The subscriber in client to subscribe user publisher and is subscribed by 
ClientStreamObserver.
+ */
+public class ClientTripleReactorSubscriber<T> extends 
AbstractTripleReactorSubscriber<T> {
 
-    public void cancel(Throwable throwable) {
-        cancellationContext.cancel(throwable);
+    @Override
+    public void cancel() {
+        if (!isCanceled()) {
+            super.cancel();
+            ((ClientCallToObserverAdapter<T>) downstream).cancel(new 
Exception("Cancelled"));
+        }
     }
-
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorPublisher.java
similarity index 57%
copy from 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
copy to 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorPublisher.java
index 3a059bc5fb..33addf5d65 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorPublisher.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.dubbo.rpc.protocol.tri;
+package org.apache.dubbo.rpc.protocol.tri.reactive;
 
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
-public abstract class CancelableStreamObserver<T> implements StreamObserver<T> 
{
-
-    private CancellationContext cancellationContext;
-
-    public void setCancellationContext(CancellationContext 
cancellationContext) {
-        this.cancellationContext = cancellationContext;
-    }
+/**
+ * Used in ManyToOne and ManyToMany in server. <br>
+ * It is a Publisher for user subscriber to subscribe. <br>
+ * It is a StreamObserver for requestStream. <br>
+ * It is a Subscription for user subscriber to request and pass request to 
responseStream.
+ */
+public class ServerTripleReactorPublisher<T> extends 
AbstractTripleReactorPublisher<T> {
 
-    public void cancel(Throwable throwable) {
-        cancellationContext.cancel(throwable);
+    public ServerTripleReactorPublisher(CallStreamObserver<?> 
callStreamObserver) {
+        super.onSubscribe(callStreamObserver);
     }
-
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorSubscriber.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorSubscriber.java
new file mode 100644
index 0000000000..32453ac372
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/ServerTripleReactorSubscriber.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.CancellationContext;
+import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+
+/**
+ * The Subscriber in server to passing the data produced by user publisher to 
responseStream.
+ */
+public class ServerTripleReactorSubscriber<T> extends 
AbstractTripleReactorSubscriber<T>{
+
+    @Override
+    public void subscribe(CallStreamObserver<T> downstream) {
+        super.subscribe(downstream);
+        if (downstream instanceof CancelableStreamObserver) {
+            CancelableStreamObserver<?> observer = 
(CancelableStreamObserver<?>) downstream;
+            final CancellationContext context;
+            if (observer.getCancellationContext() == null) {
+                context = new CancellationContext();
+                observer.setCancellationContext(context);
+            } else {
+                context = observer.getCancellationContext();
+            }
+            context.addListener(ctx -> super.cancel());
+        }
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorClientCalls.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorClientCalls.java
new file mode 100644
index 0000000000..4ae6d45755
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorClientCalls.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.StubMethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.ClientTripleReactorPublisher;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.ClientTripleReactorSubscriber;
+import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * A collection of methods to convert client-side Reactor calls to stream 
calls.
+ */
+public final class ReactorClientCalls {
+
+    private ReactorClientCalls() {
+    }
+
+    /**
+     * Implements a unary -> unary call as Mono -> Mono
+     *
+     * @param invoker invoker
+     * @param monoRequest the mono with request
+     * @param methodDescriptor the method descriptor
+     * @return the mono with response
+     */
+    public static <TRequest, TResponse, TInvoker> Mono<TResponse> 
oneToOne(Invoker<TInvoker> invoker,
+                                                                 
Mono<TRequest> monoRequest,
+                                                                 
StubMethodDescriptor methodDescriptor) {
+        try {
+            return Mono.create(emitter -> monoRequest.subscribe(
+                    request -> StubInvocationUtil.unaryCall(invoker, 
methodDescriptor, request, new StreamObserver<TResponse>() {
+                        @Override
+                        public void onNext(TResponse tResponse) {
+                            emitter.success(tResponse);
+                        }
+
+                        @Override
+                        public void onError(Throwable throwable) {
+                            emitter.error(throwable);
+                        }
+
+                        @Override
+                        public void onCompleted() {
+                            // Do nothing
+                        }
+                    }),
+                    emitter::error
+                ));
+        } catch (Throwable throwable) {
+            return Mono.error(throwable);
+        }
+    }
+
+    /**
+     * Implements a unary -> stream call as Mono -> Flux
+     *
+     * @param invoker invoker
+     * @param monoRequest the mono with request
+     * @param methodDescriptor the method descriptor
+     * @return the flux with response
+     */
+    public static <TRequest, TResponse, TInvoker> Flux<TResponse> 
oneToMany(Invoker<TInvoker> invoker,
+                                                                            
Mono<TRequest> monoRequest,
+                                                                            
StubMethodDescriptor methodDescriptor) {
+        try {
+            return monoRequest
+                .flatMapMany(request -> {
+                    ClientTripleReactorPublisher<TResponse> clientPublisher = 
new ClientTripleReactorPublisher<>();
+                    StubInvocationUtil.serverStreamCall(invoker, 
methodDescriptor, request, clientPublisher);
+                    return clientPublisher;
+                });
+        } catch (Throwable throwable) {
+            return Flux.error(throwable);
+        }
+    }
+
+    /**
+     * Implements a stream -> unary call as Flux -> Mono
+     *
+     * @param invoker invoker
+     * @param requestFlux the flux with request
+     * @param methodDescriptor the method descriptor
+     * @return the mono with response
+     */
+    public static <TRequest, TResponse, TInvoker> Mono<TResponse> 
manyToOne(Invoker<TInvoker> invoker,
+                                                                            
Flux<TRequest> requestFlux,
+                                                                            
StubMethodDescriptor methodDescriptor) {
+        try {
+            ClientTripleReactorSubscriber<TRequest> clientSubscriber = 
requestFlux.subscribeWith(new ClientTripleReactorSubscriber<>());
+            ClientTripleReactorPublisher<TResponse> clientPublisher = new 
ClientTripleReactorPublisher<>(
+                s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>) 
s),
+                clientSubscriber::cancel);
+            return Mono.from(clientPublisher).doOnSubscribe(dummy ->
+                StubInvocationUtil.biOrClientStreamCall(invoker, 
methodDescriptor, clientPublisher));
+        } catch (Throwable throwable) {
+            return Mono.error(throwable);
+        }
+    }
+
+    /**
+     * Implements a stream -> stream call as Flux -> Flux
+     *
+     * @param invoker invoker
+     * @param requestFlux the flux with request
+     * @param methodDescriptor the method descriptor
+     * @return the flux with response
+     */
+    public static <TRequest, TResponse, TInvoker> Flux<TResponse> 
manyToMany(Invoker<TInvoker> invoker,
+                                                                             
Flux<TRequest> requestFlux,
+                                                                             
StubMethodDescriptor methodDescriptor) {
+        try {
+            ClientTripleReactorSubscriber<TRequest> clientSubscriber = 
requestFlux.subscribeWith(new ClientTripleReactorSubscriber<>());
+            ClientTripleReactorPublisher<TResponse> clientPublisher = new 
ClientTripleReactorPublisher<>(
+                s -> clientSubscriber.subscribe((CallStreamObserver<TRequest>) 
s),
+                clientSubscriber::cancel);
+            return Flux.from(clientPublisher).doOnSubscribe(dummy ->
+                StubInvocationUtil.biOrClientStreamCall(invoker, 
methodDescriptor, clientPublisher));
+        } catch (Throwable throwable) {
+            return Flux.error(throwable);
+        }
+    }
+
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorServerCalls.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorServerCalls.java
new file mode 100644
index 0000000000..6454d26ba3
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/calls/ReactorServerCalls.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.calls;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import org.apache.dubbo.rpc.protocol.tri.reactive.ServerTripleReactorPublisher;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.ServerTripleReactorSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A collection of methods to convert server-side stream calls to Reactor 
calls.
+ */
+public final class ReactorServerCalls {
+
+    private ReactorServerCalls() {
+    }
+
+    /**
+     * Implements a unary -> unary call as Mono -> Mono
+     *
+     * @param request request
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     */
+    public static <T, R> void oneToOne(T request,
+                                       StreamObserver<R> responseObserver,
+                                       Function<Mono<T>, Mono<R>> func) {
+        func.apply(Mono.just(request)).subscribe(res -> {
+            CompletableFuture.completedFuture(res)
+                .whenComplete((r, t) -> {
+                    if (t != null) {
+                        responseObserver.onError(t);
+                    } else {
+                        responseObserver.onNext(r);
+                        responseObserver.onCompleted();
+                    }
+                });
+        });
+    }
+
+    /**
+     * Implements a unary -> stream call as Mono -> Flux
+     *
+     * @param request request
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     */
+    public static <T, R> void oneToMany(T request,
+                                        StreamObserver<R> responseObserver,
+                                        Function<Mono<T>, Flux<R>> func) {
+        try {
+            Flux<R> response = func.apply(Mono.just(request));
+            ServerTripleReactorSubscriber<R> subscriber = 
response.subscribeWith(new ServerTripleReactorSubscriber<>());
+            subscriber.subscribe((ServerCallToObserverAdapter<R>) 
responseObserver);
+        } catch (Throwable throwable) {
+            responseObserver.onError(throwable);
+        }
+    }
+
+    /**
+     * Implements a stream -> unary call as Flux -> Mono
+     *
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     * @return request StreamObserver
+     */
+    public static <T, R> StreamObserver<T> manyToOne(StreamObserver<R> 
responseObserver,
+                                                      Function<Flux<T>, 
Mono<R>> func) {
+        ServerTripleReactorPublisher<T> serverPublisher = new 
ServerTripleReactorPublisher<T>((CallStreamObserver<R>) responseObserver);
+        try {
+            Mono<R> responseMono = func.apply(Flux.from(serverPublisher));
+            responseMono.subscribe(value -> {
+                    // Don't try to respond if the server has already canceled 
the request
+                    if (!serverPublisher.isCancelled()) {
+                        responseObserver.onNext(value);
+                    }
+                },
+                throwable -> {
+                    // Don't try to respond if the server has already canceled 
the request
+                    if (!serverPublisher.isCancelled()) {
+                        responseObserver.onError(throwable);
+                    }
+                },
+                responseObserver::onCompleted
+            );
+            serverPublisher.startRequest();
+        } catch (Throwable throwable) {
+            responseObserver.onError(throwable);
+        }
+        return serverPublisher;
+    }
+
+    /**
+     * Implements a stream -> stream call as Flux -> Flux
+     *
+     * @param responseObserver response StreamObserver
+     * @param func service implementation
+     * @return request StreamObserver
+     */
+    public static <T, R> StreamObserver<T> manyToMany(StreamObserver<R> 
responseObserver,
+                                                      Function<Flux<T>, 
Flux<R>> func) {
+        // responseObserver is also a subscription of publisher, we can use it 
to request more data
+        ServerTripleReactorPublisher<T> serverPublisher = new 
ServerTripleReactorPublisher<T>((CallStreamObserver<R>) responseObserver);
+        try {
+            Flux<R> responseFlux = func.apply(Flux.from(serverPublisher));
+            ServerTripleReactorSubscriber<R> serverSubscriber = 
responseFlux.subscribeWith(new ServerTripleReactorSubscriber<>());
+            serverSubscriber.subscribe((CallStreamObserver<R>) 
responseObserver);
+            serverPublisher.startRequest();
+        } catch (Throwable throwable) {
+            responseObserver.onError(throwable);
+        }
+
+        return serverPublisher;
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToManyMethodHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToManyMethodHandler.java
new file mode 100644
index 0000000000..df583f6711
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToManyMethodHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Flux;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of ManyToMany() method for stub invocation.
+ */
+public class ManyToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Flux<T>, Flux<R>> func;
+
+    public ManyToManyMethodHandler(Function<Flux<T>, Flux<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+        CallStreamObserver<R> responseObserver = (CallStreamObserver<R>) 
arguments[0];
+        StreamObserver<T> requestObserver = 
ReactorServerCalls.manyToMany(responseObserver, func);
+        return CompletableFuture.completedFuture(requestObserver);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToOneMethodHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToOneMethodHandler.java
new file mode 100644
index 0000000000..133f92a29b
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/ManyToOneMethodHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of ManyToOne() method for stub invocation.
+ */
+public class ManyToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Flux<T>, Mono<R>> func;
+
+    public ManyToOneMethodHandler(Function<Flux<T>, Mono<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<StreamObserver<T>> invoke(Object[] arguments) {
+        CallStreamObserver<R> responseObserver = (CallStreamObserver<R>) 
arguments[0];
+        StreamObserver<T> requestObserver = 
ReactorServerCalls.manyToOne(responseObserver, func);
+        return CompletableFuture.completedFuture(requestObserver);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToManyMethodHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToManyMethodHandler.java
new file mode 100644
index 0000000000..ef94d6d80e
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToManyMethodHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of OneToMany() method for stub invocation.
+ */
+public class OneToManyMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Mono<T>, Flux<R>> func;
+
+    public OneToManyMethodHandler(Function<Mono<T>, Flux<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<?> invoke(Object[] arguments) {
+        T request = (T) arguments[0];
+        StreamObserver<R> responseObserver = (StreamObserver<R>) arguments[1];
+        ReactorServerCalls.oneToMany(request, responseObserver, func);
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToOneMethodHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToOneMethodHandler.java
new file mode 100644
index 0000000000..0a8b0a7191
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/reactive/handler/OneToOneMethodHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive.handler;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.reactive.calls.ReactorServerCalls;
+import org.apache.dubbo.rpc.stub.FutureToObserverAdaptor;
+import org.apache.dubbo.rpc.stub.StubMethodHandler;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * The handler of OneToOne() method for stub invocation.
+ */
+public class OneToOneMethodHandler<T, R> implements StubMethodHandler<T, R> {
+
+    private final Function<Mono<T>, Mono<R>> func;
+
+    public OneToOneMethodHandler(Function<Mono<T>, Mono<R>> func) {
+        this.func = func;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public CompletableFuture<R> invoke(Object[] arguments) {
+        T request = (T) arguments[0];
+        CompletableFuture<R> future = new CompletableFuture<>();
+        StreamObserver<R> responseObserver = new 
FutureToObserverAdaptor<>(future);
+        ReactorServerCalls.oneToOne(request, responseObserver, func);
+        return future;
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToManyMethodHandlerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToManyMethodHandlerTest.java
new file mode 100644
index 0000000000..6669eaa7d9
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToManyMethodHandlerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToManyMethodHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Unit test for ManyToManyMethodHandler
+ */
+public final class ManyToManyMethodHandlerTest {
+
+    @Test
+    public void testInvoke() throws ExecutionException, InterruptedException {
+        AtomicInteger nextCounter = new AtomicInteger();
+        AtomicInteger completeCounter = new AtomicInteger();
+        AtomicInteger errorCounter = new AtomicInteger();
+        ServerCallToObserverAdapter<String> responseObserver = 
Mockito.mock(ServerCallToObserverAdapter.class);
+        doAnswer(o -> nextCounter.incrementAndGet())
+            .when(responseObserver).onNext(anyString());
+        doAnswer(o -> completeCounter.incrementAndGet())
+            .when(responseObserver).onCompleted();
+        doAnswer(o -> errorCounter.incrementAndGet())
+            .when(responseObserver).onError(any(Throwable.class));
+        ManyToManyMethodHandler<String, String> handler = new 
ManyToManyMethodHandler<>(requestFlux ->
+            requestFlux.map(r -> r + "0"));
+        CompletableFuture<StreamObserver<String>> future = handler.invoke(new 
Object[]{responseObserver});
+        StreamObserver<String> requestObserver = future.get();
+        for (int i = 0; i < 10; i++) {
+            requestObserver.onNext(String.valueOf(i));
+        }
+        requestObserver.onCompleted();
+        Assertions.assertEquals(10, nextCounter.get());
+        Assertions.assertEquals(0, errorCounter.get());
+        Assertions.assertEquals(1, completeCounter.get());
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToOneMethodHandlerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToOneMethodHandlerTest.java
new file mode 100644
index 0000000000..6cdde8d251
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/ManyToOneMethodHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.ManyToOneMethodHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Unit test for ManyToOneMethodHandler
+ */
+public final class ManyToOneMethodHandlerTest {
+
+    @Test
+    public void testInvoker() throws ExecutionException, InterruptedException {
+        AtomicInteger nextCounter = new AtomicInteger();
+        AtomicInteger completeCounter = new AtomicInteger();
+        AtomicInteger errorCounter = new AtomicInteger();
+        ServerCallToObserverAdapter<String> responseObserver = 
Mockito.mock(ServerCallToObserverAdapter.class);
+        doAnswer(o -> nextCounter.incrementAndGet())
+            .when(responseObserver).onNext(anyString());
+        doAnswer(o -> completeCounter.incrementAndGet())
+            .when(responseObserver).onCompleted();
+        doAnswer(o -> errorCounter.incrementAndGet())
+            .when(responseObserver).onError(any(Throwable.class));
+        ManyToOneMethodHandler<String, String> handler = new 
ManyToOneMethodHandler<>(requestFlux ->
+            
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
+        CompletableFuture<StreamObserver<String>> future = handler.invoke(new 
Object[]{responseObserver});
+        StreamObserver<String> requestObserver = future.get();
+        for (int i = 0; i < 10; i++) {
+            requestObserver.onNext(String.valueOf(i));
+        }
+        requestObserver.onCompleted();
+        Assertions.assertEquals(1, nextCounter.get());
+        Assertions.assertEquals(0, errorCounter.get());
+        Assertions.assertEquals(1, completeCounter.get());
+    }
+
+    @Test
+    public void testError() throws ExecutionException, InterruptedException {
+        AtomicInteger nextCounter = new AtomicInteger();
+        AtomicInteger completeCounter = new AtomicInteger();
+        AtomicInteger errorCounter = new AtomicInteger();
+        ServerCallToObserverAdapter<String> responseObserver = 
Mockito.mock(ServerCallToObserverAdapter.class);
+        doAnswer(o -> nextCounter.incrementAndGet())
+            .when(responseObserver).onNext(anyString());
+        doAnswer(o -> completeCounter.incrementAndGet())
+            .when(responseObserver).onCompleted();
+        doAnswer(o -> errorCounter.incrementAndGet())
+            .when(responseObserver).onError(any(Throwable.class));
+        ManyToOneMethodHandler<String, String> handler = new 
ManyToOneMethodHandler<>(requestFlux ->
+            
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
+        CompletableFuture<StreamObserver<String>> future = handler.invoke(new 
Object[]{responseObserver});
+        StreamObserver<String> requestObserver = future.get();
+        for (int i = 0; i < 10; i++) {
+            if (i == 6) {
+                requestObserver.onError(new Throwable());
+            }
+            requestObserver.onNext(String.valueOf(i));
+        }
+        requestObserver.onCompleted();
+        Assertions.assertEquals(0, nextCounter.get());
+        Assertions.assertEquals(1, errorCounter.get());
+        Assertions.assertEquals(0, completeCounter.get());
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToManyMethodHandlerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToManyMethodHandlerTest.java
new file mode 100644
index 0000000000..42b64536f1
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToManyMethodHandlerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToManyMethodHandler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import reactor.core.publisher.Flux;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Unit test for OneToManyMethodHandler
+ */
+public final class OneToManyMethodHandlerTest {
+
+    @Test
+    public void testInvoke() {
+        String request = "1,2,3,4,5,6,7";
+        AtomicInteger nextCounter = new AtomicInteger();
+        AtomicInteger completeCounter = new AtomicInteger();
+        AtomicInteger errorCounter = new AtomicInteger();
+        ServerCallToObserverAdapter<String> responseObserver = 
Mockito.mock(ServerCallToObserverAdapter.class);
+        doAnswer(o -> nextCounter.incrementAndGet())
+            .when(responseObserver).onNext(anyString());
+        doAnswer(o -> completeCounter.incrementAndGet())
+            .when(responseObserver).onCompleted();
+        doAnswer(o -> errorCounter.incrementAndGet())
+            .when(responseObserver).onError(any(Throwable.class));
+        OneToManyMethodHandler<String, String> handler = new 
OneToManyMethodHandler<>(requestMono ->
+            requestMono.flatMapMany(r -> Flux.fromArray(r.split(","))));
+        CompletableFuture<?> future = handler.invoke(new Object[]{request, 
responseObserver});
+        Assertions.assertTrue(future.isDone());
+        Assertions.assertEquals(7, nextCounter.get());
+        Assertions.assertEquals(0, errorCounter.get());
+        Assertions.assertEquals(1, completeCounter.get());
+    }
+
+    @Test
+    public void testError() {
+        String request = "1,2,3,4,5,6,7";
+        AtomicInteger nextCounter = new AtomicInteger();
+        AtomicInteger completeCounter = new AtomicInteger();
+        AtomicInteger errorCounter = new AtomicInteger();
+        ServerCallToObserverAdapter<String> responseObserver = 
Mockito.mock(ServerCallToObserverAdapter.class);
+        doAnswer(o -> nextCounter.incrementAndGet())
+            .when(responseObserver).onNext(anyString());
+        doAnswer(o -> completeCounter.incrementAndGet())
+            .when(responseObserver).onCompleted();
+        doAnswer(o -> errorCounter.incrementAndGet())
+            .when(responseObserver).onError(any(Throwable.class));
+        OneToManyMethodHandler<String, String> handler = new 
OneToManyMethodHandler<>(requestMono ->
+            Flux.create(emitter -> {
+                for (int i = 0; i < 10; i++) {
+                    if (i == 6) {
+                        emitter.error(new Throwable());
+                    } else {
+                        emitter.next(String.valueOf(i));
+                    }
+                }
+            }));
+        CompletableFuture<?> future = handler.invoke(new Object[]{request, 
responseObserver});
+        Assertions.assertTrue(future.isDone());
+        Assertions.assertEquals(6, nextCounter.get());
+        Assertions.assertEquals(1, errorCounter.get());
+        Assertions.assertEquals(0, completeCounter.get());
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToOneMethodHandlerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToOneMethodHandlerTest.java
new file mode 100644
index 0000000000..5ba898ca8d
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/reactive/OneToOneMethodHandlerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.reactive;
+
+import 
org.apache.dubbo.rpc.protocol.tri.reactive.handler.OneToOneMethodHandler;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit test for OneToOneMethodHandler
+ */
+public final class OneToOneMethodHandlerTest {
+
+    @Test
+    public void testInvoke() throws ExecutionException, InterruptedException {
+        String request = "request";
+        OneToOneMethodHandler<String, String> handler = new 
OneToOneMethodHandler<>(requestMono ->
+            requestMono.map(r -> r + "Test"));
+        CompletableFuture<?> future = handler.invoke(new Object[]{request});
+        assertEquals("requestTest", future.get());
+    }
+}

Reply via email to