This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 24ebb995b RATIS-1966. Batch frequent warn logs during gRPC exceptional
periods (#997)
24ebb995b is described below
commit 24ebb995b78b2c147d651acc467ede8c4e7cea32
Author: William Song <[email protected]>
AuthorDate: Fri Dec 29 15:07:15 2023 +0800
RATIS-1966. Batch frequent warn logs during gRPC exceptional periods (#997)
---
.../java/org/apache/ratis/util/BatchLogger.java | 117 +++++++++++++++++++++
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 13 +++
.../apache/ratis/grpc/server/GrpcLogAppender.java | 18 +++-
3 files changed, 145 insertions(+), 3 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
new file mode 100644
index 000000000..9ccd66ad7
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/** For batching excessive log messages. */
+public final class BatchLogger {
+ private static final Logger LOG = LoggerFactory.getLogger(BatchLogger.class);
+
+ private BatchLogger() {
+ }
+
+ public interface Key {}
+
+ private static final class UniqueId {
+ private final Key key;
+ private final String name;
+
+ private UniqueId(Key key, String name) {
+ this.key = Objects.requireNonNull(key, "key == null");
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (!(obj instanceof UniqueId)) {
+ return false;
+ }
+
+ final UniqueId that = (UniqueId) obj;
+ return Objects.equals(this.key, that.key)
+ && Objects.equals(this.name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode() ^ name.hashCode();
+ }
+ }
+
+ private static final class BatchedLogEntry {
+ private Consumer<String> logOp;
+ private Timestamp startTime = Timestamp.currentTime();
+ private int count = 0;
+
+ private synchronized void execute() {
+ if (count <= 1) {
+ return;
+ }
+ logOp.accept(String.format(" (Repeated %d times in the last %s)",
+ count, startTime.elapsedTime().toString(TimeUnit.SECONDS, 3)));
+ startTime = null;
+ }
+
+ private synchronized boolean tryStartBatch(Consumer<String> op) {
+ if (startTime == null) { // already executed
+ op.accept("");
+ return false;
+ }
+ logOp = op;
+ count++;
+ return count == 1;
+ }
+ }
+
+ private static final TimeoutExecutor SCHEDULER =
TimeoutExecutor.getInstance();
+ private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE =
new ConcurrentHashMap<>();
+
+ public static void warn(Key key, String name, Consumer<String> op,
TimeDuration batchDuration) {
+ warn(key, name, op, batchDuration, true);
+ }
+
+ public static void warn(Key key, String name, Consumer<String> op,
TimeDuration batchDuration, boolean shouldBatch) {
+ if (!shouldBatch || batchDuration.isNonPositive()) {
+ op.accept("");
+ return;
+ }
+
+ final UniqueId id = new UniqueId(key, name);
+ final BatchedLogEntry entry = LOG_CACHE.computeIfAbsent(id, k -> new
BatchedLogEntry());
+
+ if (entry.tryStartBatch(op)) {
+ // print the first warn log on batch start
+ op.accept("");
+ SCHEDULER.onTimeout(batchDuration,
+ () ->
Optional.ofNullable(LOG_CACHE.remove(id)).ifPresent(BatchedLogEntry::execute),
+ LOG, () -> "print batched exception failed on " + op);
+ }
+ }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index a20cdcd52..8caacfeeb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.apache.ratis.conf.ConfUtils.get;
@@ -279,6 +280,18 @@ public interface GrpcConfigKeys {
static void setHeartbeatChannel(RaftProperties properties, boolean
useSeparate) {
setBoolean(properties::setBoolean, HEARTBEAT_CHANNEL_KEY, useSeparate);
}
+
+ String LOG_MESSAGE_BATCH_DURATION_KEY = PREFIX +
".log-message.batch.duration";
+ TimeDuration LOG_MESSAGE_BATCH_DURATION_DEFAULT = TimeDuration.valueOf(5,
TimeUnit.SECONDS);
+ static TimeDuration logMessageBatchDuration(RaftProperties properties) {
+ return
getTimeDuration(properties.getTimeDuration(LOG_MESSAGE_BATCH_DURATION_DEFAULT.getUnit()),
+ LOG_MESSAGE_BATCH_DURATION_KEY, LOG_MESSAGE_BATCH_DURATION_DEFAULT,
getDefaultLog());
+ }
+ static void setLogMessageBatchDuration(RaftProperties properties,
+ TimeDuration
logMessageBatchDuration) {
+ setTimeDuration(properties::setTimeDuration,
+ LOG_MESSAGE_BATCH_DURATION_KEY, logMessageBatchDuration);
+ }
}
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7adbc7355..9a2e84f21 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.leader.LogAppenderBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -61,6 +62,11 @@ import java.util.concurrent.atomic.AtomicLong;
public class GrpcLogAppender extends LogAppenderBase {
public static final Logger LOG =
LoggerFactory.getLogger(GrpcLogAppender.class);
+ private enum BatchLogKey implements BatchLogger.Key {
+ RESET_CLIENT,
+ APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
+ }
+
private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> {
// calculate diff in order to take care the possibility of numerical
overflow
final long diff = left - right;
@@ -143,6 +149,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private final TimeDuration requestTimeoutDuration;
private final TimeDuration installSnapshotStreamTimeout;
+ private final TimeDuration logMessageBatchDuration;
private final int maxOutstandingInstallSnapshots;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@@ -167,6 +174,7 @@ public class GrpcLogAppender extends LogAppenderBase {
this.maxOutstandingInstallSnapshots =
GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties);
this.installSnapshotStreamTimeout =
GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties)
.multiply(maxOutstandingInstallSnapshots);
+ this.logMessageBatchDuration =
GrpcConfigKeys.Server.logMessageBatchDuration(properties);
this.installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.useSeparateHBChannel =
GrpcConfigKeys.Server.heartbeatChannel(properties);
@@ -204,8 +212,10 @@ public class GrpcLogAppender extends LogAppenderBase {
.map(TermIndex::getIndex)
.orElseGet(f::getMatchIndex);
if (event.isError() && request == null) {
- LOG.warn("{}: Follower failed (request=null, errorCount={}); keep
nextIndex ({}) unchanged and retry.",
- this, errorCount, f.getNextIndex());
+ final long followerNextIndex = f.getNextIndex();
+ BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" +
followerNextIndex, suffix ->
+ LOG.warn("{}: Follower failed (request=null, errorCount={}); keep
nextIndex ({}) unchanged and retry.{}",
+ this, errorCount, followerNextIndex, suffix),
logMessageBatchDuration);
return;
}
if (request != null && request.isHeartbeat()) {
@@ -523,7 +533,9 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
- GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
+ BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR,
AppendLogResponseHandler.this.name,
+ suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" +
suffix, t),
+ logMessageBatchDuration, t instanceof StatusRuntimeException);
grpcServerMetrics.onRequestRetry(); // Update try counter
AppendEntriesRequest request =
pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t));
resetClient(request, Event.ERROR);