This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ebb995b RATIS-1966. Batch frequent warn logs during gRPC exceptional 
periods (#997)
24ebb995b is described below

commit 24ebb995b78b2c147d651acc467ede8c4e7cea32
Author: William Song <[email protected]>
AuthorDate: Fri Dec 29 15:07:15 2023 +0800

    RATIS-1966. Batch frequent warn logs during gRPC exceptional periods (#997)
---
 .../java/org/apache/ratis/util/BatchLogger.java    | 117 +++++++++++++++++++++
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |  13 +++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  18 +++-
 3 files changed, 145 insertions(+), 3 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java 
b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
new file mode 100644
index 000000000..9ccd66ad7
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/** For batching excessive log messages. */
+public final class BatchLogger {
+  private static final Logger LOG = LoggerFactory.getLogger(BatchLogger.class);
+
+  private BatchLogger() {
+  }
+
+  public interface Key {}
+
+  private static final class UniqueId {
+    private final Key key;
+    private final String name;
+
+    private UniqueId(Key key, String name) {
+      this.key = Objects.requireNonNull(key, "key == null");
+      this.name = name;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (!(obj instanceof UniqueId)) {
+        return false;
+      }
+
+      final UniqueId that = (UniqueId) obj;
+      return Objects.equals(this.key, that.key)
+          && Objects.equals(this.name, that.name);
+    }
+
+    @Override
+    public int hashCode() {
+      return key.hashCode() ^ name.hashCode();
+    }
+  }
+
+  private static final class BatchedLogEntry {
+    private Consumer<String> logOp;
+    private Timestamp startTime = Timestamp.currentTime();
+    private int count = 0;
+
+    private synchronized void execute() {
+      if (count <= 1) {
+        return;
+      }
+      logOp.accept(String.format(" (Repeated %d times in the last %s)",
+          count, startTime.elapsedTime().toString(TimeUnit.SECONDS, 3)));
+      startTime = null;
+    }
+
+    private synchronized boolean tryStartBatch(Consumer<String> op) {
+      if (startTime == null) { // already executed
+        op.accept("");
+        return false;
+      }
+      logOp = op;
+      count++;
+      return count == 1;
+    }
+  }
+
+  private static final TimeoutExecutor SCHEDULER = 
TimeoutExecutor.getInstance();
+  private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE = 
new ConcurrentHashMap<>();
+
+  public static void warn(Key key, String name, Consumer<String> op, 
TimeDuration batchDuration) {
+    warn(key, name, op, batchDuration, true);
+  }
+
+  public static void warn(Key key, String name, Consumer<String> op, 
TimeDuration batchDuration, boolean shouldBatch) {
+    if (!shouldBatch || batchDuration.isNonPositive()) {
+      op.accept("");
+      return;
+    }
+
+    final UniqueId id = new UniqueId(key, name);
+    final BatchedLogEntry entry = LOG_CACHE.computeIfAbsent(id, k -> new 
BatchedLogEntry());
+
+    if (entry.tryStartBatch(op)) {
+      // print the first warn log on batch start
+      op.accept("");
+      SCHEDULER.onTimeout(batchDuration,
+          () -> 
Optional.ofNullable(LOG_CACHE.remove(id)).ifPresent(BatchedLogEntry::execute),
+          LOG, () -> "print batched exception failed on " + op);
+    }
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index a20cdcd52..8caacfeeb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.conf.ConfUtils.get;
@@ -279,6 +280,18 @@ public interface GrpcConfigKeys {
     static void setHeartbeatChannel(RaftProperties properties, boolean 
useSeparate) {
       setBoolean(properties::setBoolean, HEARTBEAT_CHANNEL_KEY, useSeparate);
     }
+
+    String LOG_MESSAGE_BATCH_DURATION_KEY = PREFIX + 
".log-message.batch.duration";
+    TimeDuration LOG_MESSAGE_BATCH_DURATION_DEFAULT = TimeDuration.valueOf(5, 
TimeUnit.SECONDS);
+    static TimeDuration logMessageBatchDuration(RaftProperties properties) {
+      return 
getTimeDuration(properties.getTimeDuration(LOG_MESSAGE_BATCH_DURATION_DEFAULT.getUnit()),
+          LOG_MESSAGE_BATCH_DURATION_KEY, LOG_MESSAGE_BATCH_DURATION_DEFAULT, 
getDefaultLog());
+    }
+    static void setLogMessageBatchDuration(RaftProperties properties,
+                                           TimeDuration 
logMessageBatchDuration) {
+      setTimeDuration(properties::setTimeDuration,
+          LOG_MESSAGE_BATCH_DURATION_KEY, logMessageBatchDuration);
+    }
   }
 
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7adbc7355..9a2e84f21 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.leader.LogAppenderBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -61,6 +62,11 @@ import java.util.concurrent.atomic.AtomicLong;
 public class GrpcLogAppender extends LogAppenderBase {
   public static final Logger LOG = 
LoggerFactory.getLogger(GrpcLogAppender.class);
 
+  private enum BatchLogKey implements BatchLogger.Key {
+    RESET_CLIENT,
+    APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
+  }
+
   private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> {
     // calculate diff in order to take care the possibility of numerical 
overflow
     final long diff = left - right;
@@ -143,6 +149,7 @@ public class GrpcLogAppender extends LogAppenderBase {
 
   private final TimeDuration requestTimeoutDuration;
   private final TimeDuration installSnapshotStreamTimeout;
+  private final TimeDuration logMessageBatchDuration;
   private final int maxOutstandingInstallSnapshots;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
@@ -167,6 +174,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     this.maxOutstandingInstallSnapshots = 
GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties);
     this.installSnapshotStreamTimeout = 
GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties)
         .multiply(maxOutstandingInstallSnapshots);
+    this.logMessageBatchDuration = 
GrpcConfigKeys.Server.logMessageBatchDuration(properties);
     this.installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.useSeparateHBChannel = 
GrpcConfigKeys.Server.heartbeatChannel(properties);
 
@@ -204,8 +212,10 @@ public class GrpcLogAppender extends LogAppenderBase {
           .map(TermIndex::getIndex)
           .orElseGet(f::getMatchIndex);
       if (event.isError() && request == null) {
-        LOG.warn("{}: Follower failed (request=null, errorCount={}); keep 
nextIndex ({}) unchanged and retry.",
-            this, errorCount, f.getNextIndex());
+        final long followerNextIndex = f.getNextIndex();
+        BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" + 
followerNextIndex, suffix ->
+            LOG.warn("{}: Follower failed (request=null, errorCount={}); keep 
nextIndex ({}) unchanged and retry.{}",
+                this, errorCount, followerNextIndex, suffix), 
logMessageBatchDuration);
         return;
       }
       if (request != null && request.isHeartbeat()) {
@@ -523,7 +533,9 @@ public class GrpcLogAppender extends LogAppenderBase {
         LOG.info("{} is already stopped", GrpcLogAppender.this);
         return;
       }
-      GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
+      BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, 
AppendLogResponseHandler.this.name,
+          suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" + 
suffix, t),
+          logMessageBatchDuration, t instanceof StatusRuntimeException);
       grpcServerMetrics.onRequestRetry(); // Update try counter
       AppendEntriesRequest request = 
pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t));
       resetClient(request, Event.ERROR);

Reply via email to