This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 5d1c0eae1c fix(3.2): Triple Reactor OneToMany Handler null pointer fix
and DubboFilter support (#14125)
5d1c0eae1c is described below
commit 5d1c0eae1cf715ab469c5201a183963dd387e765
Author: caoyanan666 <[email protected]>
AuthorDate: Sat May 11 14:49:28 2024 +0800
fix(3.2): Triple Reactor OneToMany Handler null pointer fix and DubboFilter
support (#14125)
* ReactorDubbo3TripleStub.mustache add schema registry
* Triple Reactor OneToMany Handler null pointer fix and DubboFilter support
* trigger ci
* Adjust the order
* trigger ci
* ServerTripleReactorSubscriber meaningful
---------
Co-authored-by: caoyanan <[email protected]>
Co-authored-by: Ken Liu <[email protected]>
---
.../reactive/AbstractTripleReactorSubscriber.java | 2 +-
.../reactive/ServerTripleReactorSubscriber.java | 42 ++++++++++++++++++++++
.../dubbo/reactive/calls/ReactorServerCalls.java | 17 ++++++---
.../reactive/handler/OneToManyMethodHandler.java | 3 +-
4 files changed, 57 insertions(+), 7 deletions(-)
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
index 86db857582..b3d6fa058e 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
@@ -53,7 +53,7 @@ public abstract class AbstractTripleReactorSubscriber<T>
implements Subscriber<T
if (downstream == null) {
throw new NullPointerException();
}
- if (this.downstream == null && SUBSCRIBED.compareAndSet(false, true)) {
+ if (SUBSCRIBED.compareAndSet(false, true)) {
this.downstream = downstream;
subscription.request(1);
}
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
index ce4206b2b6..3a4a9729a0 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
@@ -20,11 +20,31 @@ import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
/**
* The Subscriber in server to passing the data produced by user publisher to
responseStream.
*/
public class ServerTripleReactorSubscriber<T> extends
AbstractTripleReactorSubscriber<T> {
+ /**
+ * The execution future of the current task, in order to be returned to
stubInvoker
+ */
+ private final CompletableFuture<List<T>> executionFuture = new
CompletableFuture<>();
+ /**
+ * The result elements collected by the current task.
+ * This class is a flux subscriber, which usually means there will be
multiple elements, so it is declared as a list type.
+ */
+ private final List<T> collectedData = new ArrayList<>();
+
+ public ServerTripleReactorSubscriber() {}
+
+ public ServerTripleReactorSubscriber(CallStreamObserver<T> streamObserver)
{
+ this.downstream = streamObserver;
+ }
+
@Override
public void subscribe(CallStreamObserver<T> downstream) {
super.subscribe(downstream);
@@ -40,4 +60,26 @@ public class ServerTripleReactorSubscriber<T> extends
AbstractTripleReactorSubsc
context.addListener(ctx -> super.cancel());
}
}
+
+ @Override
+ public void onNext(T t) {
+ super.onNext(t);
+ collectedData.add(t);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ super.onError(throwable);
+ executionFuture.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ super.onComplete();
+ executionFuture.complete(this.collectedData);
+ }
+
+ public CompletableFuture<List<T>> getExecutionFuture() {
+ return executionFuture;
+ }
}
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index f6a39c944c..24218a1b08 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -24,6 +24,8 @@ import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import reactor.core.publisher.Flux;
@@ -65,14 +67,21 @@ public final class ReactorServerCalls {
* @param responseObserver response StreamObserver
* @param func service implementation
*/
- public static <T, R> void oneToMany(
+ public static <T, R> CompletableFuture<List<R>> oneToMany(
T request, StreamObserver<R> responseObserver, Function<Mono<T>,
Flux<R>> func) {
try {
+ ServerCallToObserverAdapter<R> serverCallToObserverAdapter =
+ (ServerCallToObserverAdapter<R>) responseObserver;
Flux<R> response = func.apply(Mono.just(request));
- ServerTripleReactorSubscriber<R> subscriber =
response.subscribeWith(new ServerTripleReactorSubscriber<>());
- subscriber.subscribe((ServerCallToObserverAdapter<R>)
responseObserver);
+ ServerTripleReactorSubscriber<R> reactorSubscriber =
+ new
ServerTripleReactorSubscriber<>(serverCallToObserverAdapter);
+
response.subscribeWith(reactorSubscriber).subscribe(serverCallToObserverAdapter);
+ return reactorSubscriber.getExecutionFuture();
} catch (Throwable throwable) {
- responseObserver.onError(throwable);
+ doOnResponseHasException(throwable, responseObserver);
+ CompletableFuture<List<R>> future = new CompletableFuture<>();
+ future.completeExceptionally(throwable);
+ return future;
}
}
diff --git
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
index b5b0534fff..fc2e4df9f0 100644
---
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
+++
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
@@ -42,7 +42,6 @@ public class OneToManyMethodHandler<T, R> implements
StubMethodHandler<T, R> {
public CompletableFuture<?> invoke(Object[] arguments) {
T request = (T) arguments[0];
StreamObserver<R> responseObserver = (StreamObserver<R>) arguments[1];
- ReactorServerCalls.oneToMany(request, responseObserver, func);
- return CompletableFuture.completedFuture(null);
+ return ReactorServerCalls.oneToMany(request, responseObserver, func);
}
}