This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 1abe9f8087 fix(3.2): The oneToOne method of the ReactorServerCalls
class will cause the request to hang when the result is Mono Empty (#14121)
1abe9f8087 is described below
commit 1abe9f8087d68a0d88efa438d762ec7bfeb52fd9
Author: caoyanan666 <[email protected]>
AuthorDate: Wed May 8 14:17:18 2024 +0800
fix(3.2): The oneToOne method of the ReactorServerCalls class will cause
the request to hang when the result is Mono Empty (#14121)
* Fix triple reactor request hung when result is Mono Empty
* code format
* fix compile
---------
Co-authored-by: caoyanan <[email protected]>
---
.../dubbo/reactive/calls/ReactorServerCalls.java | 31 ++++++++++++++--------
1 file changed, 20 insertions(+), 11 deletions(-)
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index 58ec934c42..f6a39c944c 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -19,10 +19,11 @@ package org.apache.dubbo.reactive.calls;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
+import org.apache.dubbo.rpc.StatusRpcException;
+import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import reactor.core.publisher.Flux;
@@ -43,16 +44,18 @@ public final class ReactorServerCalls {
* @param func service implementation
*/
public static <T, R> void oneToOne(T request, StreamObserver<R>
responseObserver, Function<Mono<T>, Mono<R>> func) {
- func.apply(Mono.just(request)).subscribe(res -> {
- CompletableFuture.completedFuture(res).whenComplete((r, t) -> {
- if (t != null) {
- responseObserver.onError(t);
- } else {
- responseObserver.onNext(r);
- responseObserver.onCompleted();
- }
- });
- });
+ try {
+ func.apply(Mono.just(request))
+ .subscribe(
+ res -> {
+ responseObserver.onNext(res);
+ responseObserver.onCompleted();
+ },
+ throwable -> doOnResponseHasException(throwable,
responseObserver),
+ () ->
doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(),
responseObserver));
+ } catch (Throwable throwable) {
+ doOnResponseHasException(throwable, responseObserver);
+ }
}
/**
@@ -131,4 +134,10 @@ public final class ReactorServerCalls {
return serverPublisher;
}
+
+ private static void doOnResponseHasException(Throwable throwable,
StreamObserver<?> responseObserver) {
+ StatusRpcException statusRpcException =
+ TriRpcStatus.getStatus(throwable).asException();
+ responseObserver.onError(statusRpcException);
+ }
}