This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 08dc58fcba8b66480066f0b9f0cc9860415a9346 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Mon Mar 6 20:32:54 2023 -0800 RATIS-1807. Support timeout in gRPC. (#842) (cherry picked from commit 69263c884573b87eada10bdd269c9e9d31fb2e94) --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 7 +- .../grpc/server/GrpcServerProtocolClient.java | 7 +- .../grpc/util/ResponseNotifyClientInterceptor.java | 72 ++++++++++++ .../ratis/grpc/util/StreamObserverWithTimeout.java | 95 ++++++++++++++++ ratis-proto/src/main/proto/Test.proto | 37 +++++++ .../org/apache/ratis/grpc/util/GrpcTestClient.java | 123 +++++++++++++++++++++ .../org/apache/ratis/grpc/util/GrpcTestServer.java | 108 ++++++++++++++++++ .../grpc/util/TestStreamObserverWithTimeout.java | 122 ++++++++++++++++++++ 8 files changed, 566 insertions(+), 5 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index c91880097..0f975bf62 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -597,7 +597,8 @@ public class GrpcLogAppender extends LogAppenderBase { StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null; final String requestId = UUID.randomUUID().toString(); try { - snapshotRequestObserver = getClient().installSnapshot(responseHandler); + snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot", + requestTimeoutDuration, responseHandler); for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { if (isRunning()) { snapshotRequestObserver.onNext(request); @@ -647,7 +648,9 @@ public class GrpcLogAppender extends LogAppenderBase { LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request)); } try { - snapshotRequestObserver = getClient().installSnapshot(responseHandler); + snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot", + requestTimeoutDuration, responseHandler); + snapshotRequestObserver.onNext(request); getFollower().updateLastRpcSendTime(false); responseHandler.addPending(request); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index 4c28c1df4..c3f8730e7 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -19,6 +19,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.grpc.util.StreamObserverWithTimeout; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; @@ -136,9 +137,9 @@ public class GrpcServerProtocolClient implements Closeable { } StreamObserver<InstallSnapshotRequestProto> installSnapshot( - StreamObserver<InstallSnapshotReplyProto> responseHandler) { - return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .installSnapshot(responseHandler); + String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) { + return StreamObserverWithTimeout.newInstance(name, timeout, + i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler)); } // short-circuit the backoff timer and make them reconnect immediately. diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java new file mode 100644 index 000000000..77577b06d --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.util; + +import org.apache.ratis.thirdparty.io.grpc.CallOptions; +import org.apache.ratis.thirdparty.io.grpc.Channel; +import org.apache.ratis.thirdparty.io.grpc.ClientCall; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; +import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall; +import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener; +import org.apache.ratis.thirdparty.io.grpc.Metadata; +import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Consumer; + +/** + * Invoke the given notifier when receiving a response. + */ +public class ResponseNotifyClientInterceptor implements ClientInterceptor { + public static final Logger LOG = LoggerFactory.getLogger(ResponseNotifyClientInterceptor.class); + + private final Consumer<Object> notifier; + + public ResponseNotifyClientInterceptor(Consumer<Object> notifier) { + this.notifier = notifier; + } + + @Override + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( + MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { + LOG.debug("callOptions {}", callOptions); + return new Call<>(next.newCall(method, callOptions)); + } + + private final class Call<ReqT, RespT> + extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> { + + private Call(ClientCall<ReqT, RespT> delegate) { + super(delegate); + } + + @Override + public void start(Listener<RespT> responseListener, Metadata headers) { + LOG.debug("start {}", headers); + super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) { + @Override + public void onMessage(RespT message) { + LOG.debug("onMessage {}", message); + notifier.accept(message); + super.onMessage(message); + } + }, headers); + } + } +} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java new file mode 100644 index 000000000..2b875f3ed --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.util; + +import org.apache.ratis.protocol.exceptions.TimeoutIOException; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.TimeoutExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.IntSupplier; + +public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { + public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class); + + public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout, + Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) { + final AtomicInteger responseCount = new AtomicInteger(); + final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor( + r -> responseCount.getAndIncrement()); + return new StreamObserverWithTimeout<>( + name, timeout, responseCount::get, newStreamObserver.apply(interceptor)); + } + + private final String name; + private final TimeDuration timeout; + private final StreamObserver<T> observer; + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); + + private final AtomicBoolean isClose = new AtomicBoolean(); + private final AtomicInteger requestCount = new AtomicInteger(); + private final IntSupplier responseCount; + + private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount, + StreamObserver<T> observer) { + this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name; + this.timeout = timeout; + this.responseCount = responseCount; + this.observer = observer; + } + + @Override + public void onNext(T request) { + observer.onNext(request); + final int id = requestCount.incrementAndGet(); + scheduler.onTimeout(timeout, () -> handleTimeout(id, request), + LOG, () -> name + ": Timeout check failed for request: " + request); + } + + private void handleTimeout(int id, T request) { + if (id > responseCount.getAsInt()) { + onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request)); + } + } + + @Override + public void onError(Throwable throwable) { + if (isClose.compareAndSet(false, true)) { + observer.onError(throwable); + } + } + + @Override + public void onCompleted() { + if (isClose.compareAndSet(false, true)) { + observer.onCompleted(); + } + } + + @Override + public String toString() { + return name; + } +} diff --git a/ratis-proto/src/main/proto/Test.proto b/ratis-proto/src/main/proto/Test.proto new file mode 100644 index 000000000..8d5769ff3 --- /dev/null +++ b/ratis-proto/src/main/proto/Test.proto @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.ratis.test.proto"; +option java_outer_classname = "TestProto"; + +package org.apache.ratis.test; + +service Greeter { + rpc Hello (stream HelloRequest) + returns (stream HelloReply) {} +} + +message HelloRequest { + string name = 1; +} + +message HelloReply { + string message = 1; +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java new file mode 100644 index 000000000..0923b27fe --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.util; + +import org.apache.ratis.test.proto.GreeterGrpc; +import org.apache.ratis.test.proto.GreeterGrpc.GreeterStub; +import org.apache.ratis.test.proto.HelloReply; +import org.apache.ratis.test.proto.HelloRequest; +import org.apache.ratis.thirdparty.io.grpc.Deadline; +import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; +import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +/** gRPC client for testing */ +class GrpcTestClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(GrpcTestClient.class); + + @FunctionalInterface + interface StreamObserverFactory + extends BiFunction<GreeterStub, StreamObserver<HelloReply>, StreamObserver<HelloRequest>> { + } + + static StreamObserverFactory withDeadline(TimeDuration timeout) { + final Deadline d = Deadline.after(timeout.getDuration(), timeout.getUnit()); + return (stub, responseHandler) -> stub.withDeadline(d).hello(responseHandler); + } + + static StreamObserverFactory withTimeout(TimeDuration timeout) { + return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, + i -> stub.withInterceptors(i).hello(responseHandler)); + } + + private final ManagedChannel channel; + private final StreamObserver<HelloRequest> requestHandler; + private final Queue<CompletableFuture<String>> replies = new ConcurrentLinkedQueue<>(); + + GrpcTestClient(String host, int port, StreamObserverFactory factory) { + this.channel = ManagedChannelBuilder.forAddress(host, port) + .usePlaintext() + .build(); + + final GreeterStub asyncStub = GreeterGrpc.newStub(channel); + final StreamObserver<HelloReply> responseHandler = new StreamObserver<HelloReply>() { + @Override + public void onNext(HelloReply helloReply) { + replies.poll().complete(helloReply.getMessage()); + } + + @Override + public void onError(Throwable throwable) { + LOG.info("onError", throwable); + completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + LOG.info("onCompleted"); + completeExceptionally(new IllegalStateException("onCompleted")); + } + + void completeExceptionally(Throwable throwable) { + replies.forEach(f -> f.completeExceptionally(throwable)); + replies.clear(); + } + }; + + this.requestHandler = factory.apply(asyncStub, responseHandler); + } + + @Override + public void close() throws IOException { + try { + /* After the request handler is cancelled, no more life-cycle hooks are allowed, + * see {@link org.apache.ratis.thirdparty.io.grpc.ClientCall.Listener#cancel(String, Throwable)} */ + // requestHandler.onCompleted(); + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException("Failed to close", e); + } + } + + CompletableFuture<String> send(String name) { + LOG.info("send {}", name); + final HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + final CompletableFuture<String> f = new CompletableFuture<>(); + try { + requestHandler.onNext(request); + replies.offer(f); + } catch (IllegalStateException e) { + // already closed + f.completeExceptionally(e); + } + return f; + } +} \ No newline at end of file diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java new file mode 100644 index 000000000..ec9d63b13 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.util; + +import org.apache.ratis.test.proto.GreeterGrpc; +import org.apache.ratis.test.proto.HelloReply; +import org.apache.ratis.test.proto.HelloRequest; +import org.apache.ratis.thirdparty.io.grpc.Server; +import org.apache.ratis.thirdparty.io.grpc.ServerBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** gRPC server for testing */ +class GrpcTestServer implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(GrpcTestServer.class); + + private final Server server; + + GrpcTestServer(int port, int slow, TimeDuration timeout) { + this.server = ServerBuilder.forPort(port) + .addService(new GreeterImpl(slow, timeout)) + .build(); + } + + int start() throws IOException { + server.start(); + return server.getPort(); + } + + @Override + public void close() throws IOException { + try { + server.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException("Failed to close", e); + } + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + static String toReplySuffix(String request) { + return ") Hello " + request; + } + + private final int slow; + private final TimeDuration shortSleepTime; + private final TimeDuration longSleepTime; + private int count = 0; + + GreeterImpl(int slow, TimeDuration timeout) { + this.slow = slow; + this.shortSleepTime = timeout.multiply(0.25); + this.longSleepTime = timeout.multiply(2); + } + + @Override + public StreamObserver<HelloRequest> hello(StreamObserver<HelloReply> responseObserver) { + return new StreamObserver<HelloRequest>() { + @Override + public void onNext(HelloRequest helloRequest) { + final String reply = count + toReplySuffix(helloRequest.getName()); + final TimeDuration sleepTime = count < slow ? shortSleepTime : longSleepTime; + LOG.info("count = {}, slow = {}, sleep {}", reply, slow, sleepTime); + try { + sleepTime.sleep(); + } catch (InterruptedException e) { + responseObserver.onError(e); + return; + } + responseObserver.onNext(HelloReply.newBuilder().setMessage(reply).build()); + count++; + } + + @Override + public void onError(Throwable throwable) { + LOG.error("onError", throwable); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + } +} \ No newline at end of file diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java new file mode 100644 index 000000000..dac58812d --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.util; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.grpc.util.GrpcTestClient.StreamObserverFactory; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.Slf4jUtils; +import org.apache.ratis.util.StringUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.event.Level; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +public class TestStreamObserverWithTimeout extends BaseTest { + { + Slf4jUtils.setLogLevel(ResponseNotifyClientInterceptor.LOG, Level.TRACE); + } + + enum Type { + WithDeadline(GrpcTestClient::withDeadline), + WithTimeout(GrpcTestClient::withTimeout); + + private final Function<TimeDuration, StreamObserverFactory> factory; + + Type(Function<TimeDuration, StreamObserverFactory> function) { + this.factory = function; + } + + StreamObserverFactory createFunction(TimeDuration timeout) { + return factory.apply(timeout); + } + } + + @Test + public void testWithDeadline() throws Exception { + //the total sleep time is within the deadline + runTestTimeout(2, Type.WithDeadline); + } + + @Test + public void testWithDeadlineFailure() { + //Expected to have DEADLINE_EXCEEDED + testFailureCase("total sleep time is longer than the deadline", + () -> runTestTimeout(5, Type.WithDeadline), + ExecutionException.class, StatusRuntimeException.class); + } + + @Test + public void testWithTimeout() throws Exception { + //Each sleep time is within the timeout, + //Note that the total sleep time is longer than the timeout, but it does not matter. + runTestTimeout(5, Type.WithTimeout); + } + + void runTestTimeout(int slow, Type type) throws Exception { + LOG.info("slow = {}, {}", slow, type); + final TimeDuration timeout = ONE_SECOND.multiply(0.5); + final StreamObserverFactory function = type.createFunction(timeout); + final InetSocketAddress address = NetUtils.createLocalServerAddress(); + + final List<String> messages = new ArrayList<>(); + for (int i = 0; i < 2 * slow; i++) { + messages.add("m" + i); + } + try (GrpcTestServer server = new GrpcTestServer(address.getPort(), slow, timeout)) { + final int port = server.start(); + try (GrpcTestClient client = new GrpcTestClient(address.getHostName(), port, function)) { + + final List<CompletableFuture<String>> futures = new ArrayList<>(); + for (String m : messages) { + if (type == Type.WithTimeout) { + timeout.sleep(); + } + futures.add(client.send(m)); + } + + int i = 0; + for (; i < slow; i++) { + final String expected = i + GrpcTestServer.GreeterImpl.toReplySuffix(messages.get(i)); + final String reply = futures.get(i).get(); + Assert.assertEquals("expected = " + expected + " != reply = " + reply, expected, reply); + LOG.info("{}) passed", i); + } + + for (; i < messages.size(); i++) { + final CompletableFuture<String> f = futures.get(i); + try { + final String reply = f.get(); + Assert.fail(i + ") reply = " + reply + ", " + + StringUtils.completableFuture2String(f, false)); + } catch (ExecutionException e) { + LOG.info("GOOD! {}) {}, {}", i, StringUtils.completableFuture2String(f, true), e); + } + } + } + } + } +}
