This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 5d1c0eae1c fix(3.2): Triple Reactor OneToMany Handler null pointer fix 
and DubboFilter support (#14125)
5d1c0eae1c is described below

commit 5d1c0eae1cf715ab469c5201a183963dd387e765
Author: caoyanan666 <[email protected]>
AuthorDate: Sat May 11 14:49:28 2024 +0800

    fix(3.2): Triple Reactor OneToMany Handler null pointer fix and DubboFilter 
support (#14125)
    
    * ReactorDubbo3TripleStub.mustache add schema registry
    
    * Triple Reactor OneToMany Handler null pointer fix and DubboFilter support
    
    * trigger ci
    
    * Adjust the order
    
    * trigger ci
    
    * ServerTripleReactorSubscriber meaningful
    
    ---------
    
    Co-authored-by: caoyanan <[email protected]>
    Co-authored-by: Ken Liu <[email protected]>
---
 .../reactive/AbstractTripleReactorSubscriber.java  |  2 +-
 .../reactive/ServerTripleReactorSubscriber.java    | 42 ++++++++++++++++++++++
 .../dubbo/reactive/calls/ReactorServerCalls.java   | 17 ++++++---
 .../reactive/handler/OneToManyMethodHandler.java   |  3 +-
 4 files changed, 57 insertions(+), 7 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
index 86db857582..b3d6fa058e 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/AbstractTripleReactorSubscriber.java
@@ -53,7 +53,7 @@ public abstract class AbstractTripleReactorSubscriber<T> 
implements Subscriber<T
         if (downstream == null) {
             throw new NullPointerException();
         }
-        if (this.downstream == null && SUBSCRIBED.compareAndSet(false, true)) {
+        if (SUBSCRIBED.compareAndSet(false, true)) {
             this.downstream = downstream;
             subscription.request(1);
         }
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
index ce4206b2b6..3a4a9729a0 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/ServerTripleReactorSubscriber.java
@@ -20,11 +20,31 @@ import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The Subscriber in server to passing the data produced by user publisher to 
responseStream.
  */
 public class ServerTripleReactorSubscriber<T> extends 
AbstractTripleReactorSubscriber<T> {
 
+    /**
+     * The execution future of the current task, in order to be returned to 
stubInvoker
+     */
+    private final CompletableFuture<List<T>> executionFuture = new 
CompletableFuture<>();
+    /**
+     * The result elements collected by the current task.
+     * This class is a flux subscriber, which usually means there will be 
multiple elements, so it is declared as a list type.
+     */
+    private final List<T> collectedData = new ArrayList<>();
+
+    public ServerTripleReactorSubscriber() {}
+
+    public ServerTripleReactorSubscriber(CallStreamObserver<T> streamObserver) 
{
+        this.downstream = streamObserver;
+    }
+
     @Override
     public void subscribe(CallStreamObserver<T> downstream) {
         super.subscribe(downstream);
@@ -40,4 +60,26 @@ public class ServerTripleReactorSubscriber<T> extends 
AbstractTripleReactorSubsc
             context.addListener(ctx -> super.cancel());
         }
     }
+
+    @Override
+    public void onNext(T t) {
+        super.onNext(t);
+        collectedData.add(t);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        super.onError(throwable);
+        executionFuture.completeExceptionally(throwable);
+    }
+
+    @Override
+    public void onComplete() {
+        super.onComplete();
+        executionFuture.complete(this.collectedData);
+    }
+
+    public CompletableFuture<List<T>> getExecutionFuture() {
+        return executionFuture;
+    }
 }
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index f6a39c944c..24218a1b08 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@ -24,6 +24,8 @@ import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 import reactor.core.publisher.Flux;
@@ -65,14 +67,21 @@ public final class ReactorServerCalls {
      * @param responseObserver response StreamObserver
      * @param func service implementation
      */
-    public static <T, R> void oneToMany(
+    public static <T, R> CompletableFuture<List<R>> oneToMany(
             T request, StreamObserver<R> responseObserver, Function<Mono<T>, 
Flux<R>> func) {
         try {
+            ServerCallToObserverAdapter<R> serverCallToObserverAdapter =
+                    (ServerCallToObserverAdapter<R>) responseObserver;
             Flux<R> response = func.apply(Mono.just(request));
-            ServerTripleReactorSubscriber<R> subscriber = 
response.subscribeWith(new ServerTripleReactorSubscriber<>());
-            subscriber.subscribe((ServerCallToObserverAdapter<R>) 
responseObserver);
+            ServerTripleReactorSubscriber<R> reactorSubscriber =
+                    new 
ServerTripleReactorSubscriber<>(serverCallToObserverAdapter);
+            
response.subscribeWith(reactorSubscriber).subscribe(serverCallToObserverAdapter);
+            return reactorSubscriber.getExecutionFuture();
         } catch (Throwable throwable) {
-            responseObserver.onError(throwable);
+            doOnResponseHasException(throwable, responseObserver);
+            CompletableFuture<List<R>> future = new CompletableFuture<>();
+            future.completeExceptionally(throwable);
+            return future;
         }
     }
 
diff --git 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
index b5b0534fff..fc2e4df9f0 100644
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/handler/OneToManyMethodHandler.java
@@ -42,7 +42,6 @@ public class OneToManyMethodHandler<T, R> implements 
StubMethodHandler<T, R> {
     public CompletableFuture<?> invoke(Object[] arguments) {
         T request = (T) arguments[0];
         StreamObserver<R> responseObserver = (StreamObserver<R>) arguments[1];
-        ReactorServerCalls.oneToMany(request, responseObserver, func);
-        return CompletableFuture.completedFuture(null);
+        return ReactorServerCalls.oneToMany(request, responseObserver, func);
     }
 }

Reply via email to