This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 863c2e8a0 RATIS-1987. Intermittent NPE in TestGrpcZeroCopy. (#1013)
863c2e8a0 is described below
commit 863c2e8a069337f1dd1ae3f2520d118a443fa612
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 15 13:28:14 2024 -0800
RATIS-1987. Intermittent NPE in TestGrpcZeroCopy. (#1013)
---
.../ratis/grpc/util/GrpcZeroCopyTestClient.java | 43 +++++++++++-----------
1 file changed, 21 insertions(+), 22 deletions(-)
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
index 791d5a6d2..ef66b72be 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcZeroCopyTestClient.java
@@ -29,27 +29,24 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.function.Consumer;
/** gRPC client for testing */
class GrpcZeroCopyTestClient implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcZeroCopyTestClient.class);
- @FunctionalInterface
- interface StreamObserverFactory
- extends BiFunction<GreeterStub, StreamObserver<HelloReply>,
StreamObserver<HelloRequest>> {
- }
-
private final ManagedChannel channel;
private final StreamObserver<HelloRequest> helloRequestHandler;
@@ -67,7 +64,9 @@ class GrpcZeroCopyTestClient implements Closeable {
final StreamObserver<HelloReply> helloResponseHandler = new
StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply helloReply) {
- helloReplies.poll().complete(helloReply.getMessage());
+ final CompletableFuture<String> polled = helloReplies.poll();
+ Objects.requireNonNull(polled, "polled");
+ polled.complete(helloReply.getMessage());
}
@Override
@@ -93,7 +92,9 @@ class GrpcZeroCopyTestClient implements Closeable {
final StreamObserver<BinaryReply> binaryResponseHandler = new
StreamObserver<BinaryReply>() {
@Override
public void onNext(BinaryReply binaryReply) {
- binaryReplies.poll().complete(binaryReply.getData());
+ final CompletableFuture<ByteString> polled = binaryReplies.poll();
+ Objects.requireNonNull(polled, "polled");
+ polled.complete(binaryReply.getData());
}
@Override
@@ -132,29 +133,27 @@ class GrpcZeroCopyTestClient implements Closeable {
CompletableFuture<String> send(String name) {
LOG.info("send message {}", name);
final HelloRequest request =
HelloRequest.newBuilder().setName(name).build();
- final CompletableFuture<String> f = new CompletableFuture<>();
- try {
- helloRequestHandler.onNext(request);
- helloReplies.offer(f);
- } catch (IllegalStateException e) {
- // already closed
- f.completeExceptionally(e);
- }
- return f;
+ return send(request, helloReplies, helloRequestHandler::onNext);
}
CompletableFuture<ByteString> send(ByteBuffer data) {
LOG.info("send data: size={}, direct? {}", data.remaining(),
data.isDirect());
final BinaryRequest request =
BinaryRequest.newBuilder().setData(UnsafeByteOperations.unsafeWrap(data)).build();
- final CompletableFuture<ByteString> f = new CompletableFuture<>();
+ return send(request, binaryReplies, binaryRequestHandler::onNext);
+ }
+
+ static <REQUEST, REPLY> CompletableFuture<REPLY> send(REQUEST request,
+ Queue<CompletableFuture<REPLY>> queue, Consumer<REQUEST> onNext) {
+ final CompletableFuture<REPLY> f = new CompletableFuture<>();
+ queue.offer(f);
try {
- binaryRequestHandler.onNext(request);
- binaryReplies.offer(f);
- } catch (IllegalStateException e) {
+ onNext.accept(request);
+ } catch (Exception e) {
// already closed
f.completeExceptionally(e);
+ final CompletableFuture<REPLY> polled = queue.poll();
+ Preconditions.assertSame(f, polled, "future");
}
return f;
}
-
}
\ No newline at end of file