This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push: new 2664ac8e5 RATIS-2244. Reduce the number of log messages during bootstrap (#1217) 2664ac8e5 is described below commit 2664ac8e50bf52202b581555134cbdf33c057603 Author: venkatsambath <svenkataramana...@cloudera.com> AuthorDate: Sun Feb 2 23:13:34 2025 -0500 RATIS-2244. Reduce the number of log messages during bootstrap (#1217) --- .../org/apache/ratis/client/impl/OrderedAsync.java | 2 +- .../main/java/org/apache/ratis/util/BatchLogger.java | 14 +++++++------- .../apache/ratis/grpc/server/GrpcLogAppender.java | 10 ++++++---- .../ratis/grpc/server/GrpcServerProtocolService.java | 20 ++++++++++++++++++-- .../server/impl/SnapshotInstallationHandler.java | 20 ++++++++++++-------- 5 files changed, 44 insertions(+), 22 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java index 09c6cd4ac..1e21b171b 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java @@ -213,7 +213,7 @@ public final class OrderedAsync { final Throwable exception = e; final String key = client.getId() + "-" + request.getCallId() + "-" + exception; final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}", suffix, client.getId(), request, exception); - BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op); + BatchLogger.print(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op); handleException(pending, request, e); return null; }); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java index 38dad5c49..b57bed704 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java @@ -45,9 +45,9 @@ public final class BatchLogger { private static final class UniqueId { private final Key key; - private final String name; + private final Object name; - private UniqueId(Key key, String name) { + private UniqueId(Key key, Object name) { this.key = Objects.requireNonNull(key, "key == null"); this.name = name; } @@ -99,15 +99,15 @@ public final class BatchLogger { private static final TimeoutExecutor SCHEDULER = TimeoutExecutor.getInstance(); private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE = new ConcurrentHashMap<>(); - public static void warn(Key key, String name, Consumer<String> op) { - warn(key, name, op, key.getBatchDuration(), true); + public static void print(Key key, Object name, Consumer<String> op) { + print(key, name, op, key.getBatchDuration(), true); } - public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration) { - warn(key, name, op, batchDuration, true); + public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration) { + print(key, name, op, batchDuration, true); } - public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) { + public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) { if (!shouldBatch || batchDuration.isNonPositive()) { op.accept(""); return; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 1544975a4..89e0b4634 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -66,6 +66,7 @@ public class GrpcLogAppender extends LogAppenderBase { private enum BatchLogKey implements BatchLogger.Key { RESET_CLIENT, + INCONSISTENCY_REPLY, APPEND_LOG_RESPONSE_HANDLER_ON_ERROR } @@ -217,7 +218,7 @@ public class GrpcLogAppender extends LogAppenderBase { .orElseGet(f::getMatchIndex); if (event.isError() && request == null) { final long followerNextIndex = f.getNextIndex(); - BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix -> + BatchLogger.print(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix -> LOG.warn("{}: Follower failed (request=null, errorCount={}); keep nextIndex ({}) unchanged and retry.{}", this, errorCount, followerNextIndex, suffix), logMessageBatchDuration); return; @@ -534,8 +535,9 @@ public class GrpcLogAppender extends LogAppenderBase { break; case INCONSISTENCY: grpcServerMetrics.onRequestInconsistency(getFollowerId().toString()); - LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={}", - this, reply.getResult(), reply.getNextIndex(), errorCount, request); + BatchLogger.print(BatchLogKey.INCONSISTENCY_REPLY, getFollower().getName() + "_" + reply.getNextIndex(), + suffix -> LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={} {}", + this, reply.getResult(), reply.getNextIndex(), errorCount, request, suffix)); final long requestFirstIndex = request != null? request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX; updateNextIndex(getNextIndexForInconsistency(requestFirstIndex, reply.getNextIndex())); break; @@ -555,7 +557,7 @@ public class GrpcLogAppender extends LogAppenderBase { LOG.info("{} is already stopped", GrpcLogAppender.this); return; } - BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name, + BatchLogger.print(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name, suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" + suffix, t), logMessageBatchDuration, t instanceof StatusRuntimeException); grpcServerMetrics.onRequestRetry(); // Update try counter diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 451d74c64..7e17cb3cf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -31,6 +31,8 @@ import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; +import org.apache.ratis.util.BatchLogger; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.ReferenceCountedObject; import org.slf4j.Logger; @@ -49,6 +51,11 @@ import static org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getAppen class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class); + private enum BatchLogKey implements BatchLogger.Key { + COMPLETED_REQUEST, + COMPLETED_REPLY + } + static class PendingServerRequest<REQUEST> { private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef; private final CompletableFuture<Void> future = new CompletableFuture<>(); @@ -76,6 +83,7 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> { private final RaftServer.Op op; + private final Supplier<String> nameSupplier; private final StreamObserver<REPLY> responseObserver; /** For ordered {@link #onNext(Object)} requests. */ private final AtomicReference<PendingServerRequest<REQUEST>> previousOnNext = new AtomicReference<>(); @@ -86,9 +94,14 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY> responseObserver) { this.op = op; + this.nameSupplier = MemoizedSupplier.valueOf(() -> getId() + "_" + op); this.responseObserver = responseObserver; } + String getName() { + return nameSupplier.get(); + } + private String getPreviousRequestString() { return Optional.ofNullable(previousOnNext.get()) .map(PendingServerRequest::getRequest) @@ -197,9 +210,12 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { @Override public void onCompleted() { if (isClosed.compareAndSet(false, true)) { - LOG.info("{}: Completed {}, lastRequest: {}", getId(), op, getPreviousRequestString()); + BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(), + suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}", + getId(), op, getPreviousRequestString(), suffix)); requestFuture.get().thenAccept(reply -> { - LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply); + BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(), + suffix -> LOG.info("{}: Completed {}, lastReply: {} {}", getId(), op, reply, suffix)); responseObserver.onCompleted(); }); releaseLast(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 70027e6dd..4f1ac4177 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -36,6 +36,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.util.BatchLogger; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.Preconditions; @@ -59,6 +60,11 @@ import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX; class SnapshotInstallationHandler { static final Logger LOG = LoggerFactory.getLogger(SnapshotInstallationHandler.class); + private enum BatchLogKey implements BatchLogger.Key { + INSTALL_SNAPSHOT_REQUEST, + INSTALL_SNAPSHOT_REPLY + } + static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0, INVALID_LOG_INDEX); private final RaftServerImpl server; @@ -93,10 +99,9 @@ class SnapshotInstallationHandler { } InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { - if (LOG.isInfoEnabled()) { - LOG.info("{}: receive installSnapshot: {}", getMemberId(), - ServerStringUtils.toInstallSnapshotRequestString(request)); - } + BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(), + suffix -> LOG.info("{}: receive installSnapshot: {} {}", + getMemberId(), ServerStringUtils.toInstallSnapshotRequestString(request), suffix)); final InstallSnapshotReplyProto reply; try { reply = installSnapshotImpl(request); @@ -104,10 +109,9 @@ class SnapshotInstallationHandler { LOG.error("{}: installSnapshot failed", getMemberId(), e); throw e; } - if (LOG.isInfoEnabled()) { - LOG.info("{}: reply installSnapshot: {}", getMemberId(), - ServerStringUtils.toInstallSnapshotReplyString(reply)); - } + BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, getMemberId(), + suffix -> LOG.info("{}: reply installSnapshot: {} {}", + getMemberId(), ServerStringUtils.toInstallSnapshotReplyString(reply), suffix)); return reply; }