This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 863c2e8a0 RATIS-1987. Intermittent NPE in TestGrpcZeroCopy. (#1013)
863c2e8a0 is described below

commit 863c2e8a069337f1dd1ae3f2520d118a443fa612
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 15 13:28:14 2024 -0800

    RATIS-1987. Intermittent NPE in TestGrpcZeroCopy. (#1013)
---
 .../ratis/grpc/util/GrpcZeroCopyTestClient.java    | 43 +++++++++++-----------
 1 file changed, 21 insertions(+), 22 deletions(-)

diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
index 791d5a6d2..ef66b72be 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
@@ -29,27 +29,24 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.function.Consumer;
 
 /** gRPC client for testing */
 class GrpcZeroCopyTestClient implements Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcZeroCopyTestClient.class);
 
-  @FunctionalInterface
-  interface StreamObserverFactory
-      extends BiFunction<GreeterStub, StreamObserver<HelloReply>, 
StreamObserver<HelloRequest>> {
-  }
-
   private final ManagedChannel channel;
 
   private final StreamObserver<HelloRequest> helloRequestHandler;
@@ -67,7 +64,9 @@ class GrpcZeroCopyTestClient implements Closeable {
     final StreamObserver<HelloReply> helloResponseHandler = new 
StreamObserver<HelloReply>() {
       @Override
       public void onNext(HelloReply helloReply) {
-        helloReplies.poll().complete(helloReply.getMessage());
+        final CompletableFuture<String> polled = helloReplies.poll();
+        Objects.requireNonNull(polled, "polled");
+        polled.complete(helloReply.getMessage());
       }
 
       @Override
@@ -93,7 +92,9 @@ class GrpcZeroCopyTestClient implements Closeable {
     final StreamObserver<BinaryReply> binaryResponseHandler = new 
StreamObserver<BinaryReply>() {
       @Override
       public void onNext(BinaryReply binaryReply) {
-        binaryReplies.poll().complete(binaryReply.getData());
+        final CompletableFuture<ByteString> polled = binaryReplies.poll();
+        Objects.requireNonNull(polled, "polled");
+        polled.complete(binaryReply.getData());
       }
 
       @Override
@@ -132,29 +133,27 @@ class GrpcZeroCopyTestClient implements Closeable {
   CompletableFuture<String> send(String name) {
     LOG.info("send message {}", name);
     final HelloRequest request = 
HelloRequest.newBuilder().setName(name).build();
-    final CompletableFuture<String> f = new CompletableFuture<>();
-    try {
-      helloRequestHandler.onNext(request);
-      helloReplies.offer(f);
-    } catch (IllegalStateException e) {
-      // already closed
-      f.completeExceptionally(e);
-    }
-    return f;
+    return send(request, helloReplies, helloRequestHandler::onNext);
   }
 
   CompletableFuture<ByteString> send(ByteBuffer data) {
     LOG.info("send data: size={}, direct? {}", data.remaining(), 
data.isDirect());
     final BinaryRequest request = 
BinaryRequest.newBuilder().setData(UnsafeByteOperations.unsafeWrap(data)).build();
-    final CompletableFuture<ByteString> f = new CompletableFuture<>();
+    return send(request, binaryReplies, binaryRequestHandler::onNext);
+  }
+
+  static <REQUEST, REPLY> CompletableFuture<REPLY> send(REQUEST request,
+      Queue<CompletableFuture<REPLY>> queue, Consumer<REQUEST> onNext) {
+    final CompletableFuture<REPLY> f = new CompletableFuture<>();
+    queue.offer(f);
     try {
-      binaryRequestHandler.onNext(request);
-      binaryReplies.offer(f);
-    } catch (IllegalStateException e) {
+      onNext.accept(request);
+    } catch (Exception e) {
       // already closed
       f.completeExceptionally(e);
+      final CompletableFuture<REPLY> polled = queue.poll();
+      Preconditions.assertSame(f, polled, "future");
     }
     return f;
   }
-
 }
\ No newline at end of file

Reply via email to