This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 8a226a33b09c9f2dd098411b830a6d1da7bb664b Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Tue Mar 28 22:57:41 2023 +0800 RATIS-1823. Improve error log in StreamObserverWithTimeout. (#864) --- .../java/org/apache/ratis/util/StringUtils.java | 10 ++---- .../apache/ratis/util/function/StringSupplier.java | 42 ++++++++++++++++++++++ .../grpc/server/GrpcServerProtocolClient.java | 5 +-- .../ratis/grpc/util/StreamObserverWithTimeout.java | 27 ++++++++------ .../org/apache/ratis/grpc/util/GrpcTestClient.java | 5 ++- 5 files changed, 69 insertions(+), 20 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index db1323f06..68c76ba99 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -20,6 +20,7 @@ package org.apache.ratis.util; import org.apache.ratis.thirdparty.com.google.common.collect.Interner; import org.apache.ratis.thirdparty.com.google.common.collect.Interners; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.function.StringSupplier; import java.io.PrintWriter; import java.io.StringWriter; @@ -138,13 +139,8 @@ public final class StringUtils { return stm.toString(); } - public static Object stringSupplierAsObject(Supplier<String> supplier) { - return new Object() { - @Override - public String toString() { - return supplier.get(); - } - }; + public static StringSupplier stringSupplierAsObject(Supplier<String> supplier) { + return StringSupplier.get(supplier); } public static <K, V> String map2String(Map<K, V> map) { diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java new file mode 100644 index 000000000..50bbb244e --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/function/StringSupplier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util.function; + +import java.util.function.Supplier; + +/** Supplier of {@link String}. */ +@FunctionalInterface +public interface StringSupplier extends Supplier<String> { + /** + * @return a {@link StringSupplier} which uses the given {@link Supplier} + * to override both {@link Supplier#get()} and {@link Object#toString()}. + */ + static StringSupplier get(Supplier<String> supplier) { + return new StringSupplier() { + @Override + public String get() { + return supplier.get(); + } + + @Override + public String toString() { + return supplier.get(); + } + }; + } +} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index d1bb70728..34f014ebc 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -21,6 +21,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.grpc.util.StreamObserverWithTimeout; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; @@ -138,8 +139,8 @@ public class GrpcServerProtocolClient implements Closeable { StreamObserver<InstallSnapshotRequestProto> installSnapshot( String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) { - return StreamObserverWithTimeout.newInstance(name, timeout, limit, - i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler)); + return StreamObserverWithTimeout.newInstance(name, ServerStringUtils::toInstallSnapshotRequestString, + timeout, limit, i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler)); } // short-circuit the backoff timer and make them reconnect immediately. diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java index 8fa30b1cc..723a5dd99 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java @@ -24,6 +24,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.ResourceSemaphore; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; +import org.apache.ratis.util.function.StringSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,8 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class); public static <T> StreamObserverWithTimeout<T> newInstance( - String name, TimeDuration timeout, int outstandingLimit, + String name, Function<T, String> request2String, + TimeDuration timeout, int outstandingLimit, Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) { final AtomicInteger responseCount = new AtomicInteger(); final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null; @@ -46,11 +48,13 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { semaphore.release(); } }); - return new StreamObserverWithTimeout<>( - name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor)); + return new StreamObserverWithTimeout<>(name, request2String, + timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor)); } private final String name; + private final Function<T, String> requestToStringFunction; + private final TimeDuration timeout; private final StreamObserver<T> observer; private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); @@ -60,16 +64,18 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { private final IntSupplier responseCount; private final ResourceSemaphore semaphore; - private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount, - ResourceSemaphore semaphore, StreamObserver<T> observer) { + private StreamObserverWithTimeout(String name, Function<T, String> requestToStringFunction, + TimeDuration timeout, IntSupplier responseCount, ResourceSemaphore semaphore, StreamObserver<T> observer) { this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name; + this.requestToStringFunction = requestToStringFunction; + this.timeout = timeout; this.responseCount = responseCount; this.semaphore = semaphore; this.observer = observer; } - private void acquire(T request) { + private void acquire(StringSupplier request) { if (semaphore == null) { return; } @@ -88,14 +94,15 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { @Override public void onNext(T request) { - acquire(request); + final StringSupplier requestString = StringSupplier.get(() -> requestToStringFunction.apply(request)); + acquire(requestString); observer.onNext(request); final int id = requestCount.incrementAndGet(); - scheduler.onTimeout(timeout, () -> handleTimeout(id, request), - LOG, () -> name + ": Timeout check failed for request: " + request); + scheduler.onTimeout(timeout, () -> handleTimeout(id, requestString), + LOG, () -> name + ": Timeout check failed for request: " + requestString); } - private void handleTimeout(int id, T request) { + private void handleTimeout(int id, StringSupplier request) { if (id > responseCount.getAsInt()) { onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request)); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java index 7434b2d79..130c05eb9 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java @@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,9 @@ class GrpcTestClient implements Closeable { } static StreamObserverFactory withTimeout(TimeDuration timeout) { - return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2, + final String className = JavaUtils.getClassSimpleName(HelloRequest.class) + ":"; + return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", + r -> className + r.getName(), timeout, 2, i -> stub.withInterceptors(i).hello(responseHandler)); }
