This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch snapshot-branch2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8efe8a110c750e4434babe25b25f570ae46d07b2
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun Aug 27 04:56:28 2023 -0700

    RATIS-1877. In JvmPauseMonitor, sleepTime should not be larger than 
sleepDeviationThreshold (#907)
---
 .../org/apache/ratis/util/JvmPauseMonitor.java     | 55 +++++++++++++++++++---
 .../java/org/apache/ratis/util/TimeDuration.java   |  9 ++++
 .../apache/ratis/server/impl/RaftServerProxy.java  |  8 ++--
 3 files changed, 63 insertions(+), 9 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
index fac3d0ede..2e84c1122 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -32,8 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-public class JvmPauseMonitor {
-  public static final Logger LOG = 
LoggerFactory.getLogger(JvmPauseMonitor.class);
+/**
+ * Detect pauses in JVM causing by GC or other problems in the machine.
+ */
+public final class JvmPauseMonitor {
+  static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
   private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
   static final class GcInfo {
@@ -87,16 +90,54 @@ public class JvmPauseMonitor {
         + extraSleepTime.toString(TimeUnit.SECONDS, 3) + gc;
   }
 
-  private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS);
+  /** To build {@link JvmPauseMonitor}. */
+  public static class Builder {
+    private Object name = "default";
+    private TimeDuration sleepDeviationThreshold = TimeDuration.valueOf(300, 
TimeUnit.MILLISECONDS);
+    private TimeDuration sleepTime = sleepDeviationThreshold;
+    private CheckedConsumer<TimeDuration, IOException> handler = t -> {};
+
+    public Builder setName(Object name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder setSleepTime(TimeDuration sleepTime) {
+      this.sleepTime = sleepTime;
+      return this;
+    }
+
+    public Builder setSleepDeviationThreshold(TimeDuration 
sleepDeviationThreshold) {
+      this.sleepDeviationThreshold = sleepDeviationThreshold;
+      return this;
+    }
+
+    public Builder setHandler(CheckedConsumer<TimeDuration, IOException> 
handler) {
+      this.handler = handler;
+      return this;
+    }
+
+    public JvmPauseMonitor build() {
+      return new JvmPauseMonitor(name, sleepTime, sleepDeviationThreshold, 
handler);
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private final TimeDuration sleepTime;
   private final TimeDuration sleepDeviationThreshold;
 
   private final String name;
   private final AtomicReference<Thread> threadRef = new AtomicReference<>();
   private final CheckedConsumer<TimeDuration, IOException> handler;
 
-  public JvmPauseMonitor(Object name, TimeDuration sleepDeviationThreshold,
+  private JvmPauseMonitor(Object name, TimeDuration sleepTime, TimeDuration 
sleepDeviationThreshold,
       CheckedConsumer<TimeDuration, IOException> handler) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    // use min -- if the sleep time is too long, it may not be able to detect 
the given deviation.
+    this.sleepTime = TimeDuration.min(sleepTime, sleepDeviationThreshold);
     this.sleepDeviationThreshold = sleepDeviationThreshold;
     this.handler = handler;
   }
@@ -116,7 +157,7 @@ public class JvmPauseMonitor {
     final Map<String, GcInfo> before = getGcTimes();
     final TimeDuration extraSleep;
     try {
-      extraSleep = SLEEP_TIME.sleep();
+      extraSleep = sleepTime.sleep();
     } catch (InterruptedException ie) {
       return;
     }
@@ -140,7 +181,9 @@ public class JvmPauseMonitor {
   /** Start this monitor. */
   public void start() {
     final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> 
Daemon.newBuilder()
-        .setName("JvmPauseMonitor" + 
THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build());
+        .setName(JavaUtils.getClassSimpleName(getClass()) + 
THREAD_COUNT.getAndIncrement())
+        .setRunnable(this::run)
+        .build());
     Optional.of(threadRef.updateAndGet(previous -> 
Optional.ofNullable(previous).orElseGet(supplier)))
         .filter(t -> supplier.isInitialized())
         .ifPresent(Thread::start);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 98fb3694e..10c5c6bd1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.LongUnaryOperator;
+import java.util.stream.Stream;
 
 /**
  * Time duration is represented by a long together with a {@link TimeUnit}.
@@ -59,6 +60,14 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
     return ordinal == timeUnits.length - 1? unit: timeUnits[ordinal + 1];
   }
 
+  /** @return the minimum of the given parameters. */
+  public static TimeDuration min(TimeDuration left, TimeDuration right) {
+    Objects.requireNonNull(left, "left == null");
+    Objects.requireNonNull(right, "right == null");
+    return Stream.of(left, right).min(TimeDuration::compareTo).orElseThrow(
+        () -> new IllegalStateException("Failed to compute min(" + left + ", " 
+ right + ")"));
+  }
+
   /** Abbreviations of {@link TimeUnit}. */
   public enum Abbreviation {
     NANOSECONDS("ns", "nanos"),
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index bceeb44a1..7057f53d6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -73,7 +73,7 @@ import java.util.stream.Stream;
 class RaftServerProxy implements RaftServer {
   /**
    * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
-   *
+   * <p>
    * The map is synchronized for mutations and the bulk {@link 
#getGroupIds()}/{@link #getAll()} methods
    * but the (non-bulk) {@link #get(RaftGroupId)} and {@link 
#containsGroup(RaftGroupId)} methods are not.
    * The thread safety and atomicity guarantees for the non-bulk methods are 
provided by {@link ConcurrentMap}.
@@ -221,8 +221,10 @@ class RaftServerProxy implements RaftServer {
     final TimeDuration sleepDeviationThreshold = 
RaftServerConfigKeys.sleepDeviationThreshold(properties);
     final TimeDuration rpcSlownessTimeout = 
RaftServerConfigKeys.Rpc.slownessTimeout(properties);
     final TimeDuration leaderStepDownWaitTime = 
RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
-    this.pauseMonitor = new JvmPauseMonitor(id, sleepDeviationThreshold,
-        extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, 
leaderStepDownWaitTime));
+    this.pauseMonitor = JvmPauseMonitor.newBuilder().setName(id)
+        .setSleepDeviationThreshold(sleepDeviationThreshold)
+        .setHandler(extraSleep -> handleJvmPause(extraSleep, 
rpcSlownessTimeout, leaderStepDownWaitTime))
+        .build();
     this.threadGroup = threadGroup == null ? new 
ThreadGroup(this.id.toString()) : threadGroup;
   }
 

Reply via email to