This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee32b5  RATIS-1185. Avoid directly using RaftServerProxy in 
JvmPauseMonitor. (#304)
5ee32b5 is described below

commit 5ee32b57ae929cc60f2217bbb09314d594ffb2f8
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun Nov 29 09:05:22 2020 +0800

    RATIS-1185. Avoid directly using RaftServerProxy in JvmPauseMonitor. (#304)
---
 .../main/java/org/apache/ratis/util/JavaUtils.java |  13 --
 .../org/apache/ratis/util/JvmPauseMonitor.java     | 163 +++++++++++++++++++
 .../java/org/apache/ratis/util/TimeDuration.java   |  16 +-
 .../main/java/org/apache/ratis/util/Timestamp.java |   9 ++
 .../org/apache/ratis/server/JvmPauseMonitor.java   | 180 ---------------------
 .../apache/ratis/server/RaftServerConfigKeys.java  |   8 +-
 .../apache/ratis/server/impl/FollowerState.java    |  16 +-
 .../apache/ratis/server/impl/LeaderElection.java   |   6 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  13 +-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  19 ++-
 10 files changed, 224 insertions(+), 219 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 1511688..8cf895b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -255,17 +255,4 @@ public interface JavaUtils {
       throw new CompletionException(t);
     }
   }
-
-  static boolean sleep(long sleepMs, long thresholdMs) throws 
InterruptedException {
-    final Timestamp t = Timestamp.currentTime();
-    Thread.sleep(sleepMs);
-    final long elapsedMs = t.elapsedTimeMs();
-    if (elapsedMs - sleepMs > thresholdMs) {
-      LOG.warn("Unexpected long sleep: sleep({}ms) actually took {}ms which is 
over the threshold {}ms",
-          sleepMs, elapsedMs, thresholdMs);
-      return false;
-    }
-    return true;
-  }
-
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
new file mode 100644
index 0000000..cd92297
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryManagerMXBean;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class JvmPauseMonitor {
+  public static final Logger LOG = 
LoggerFactory.getLogger(JvmPauseMonitor.class);
+
+  static final class GcInfo {
+    private final long count;
+    private final long timeMs;
+
+    private GcInfo(GarbageCollectorMXBean gcBean) {
+      this(gcBean.getCollectionCount(), gcBean.getCollectionTime());
+    }
+
+    private GcInfo(long count, long timeMs) {
+      this.count = count;
+      this.timeMs = timeMs;
+    }
+
+    GcInfo subtract(GcInfo that) {
+      return new GcInfo(this.count - that.count, this.timeMs - that.timeMs);
+    }
+
+    @Override
+    public String toString() {
+      return "count=" + count + " time=" + timeMs + "ms";
+    }
+  }
+
+  static Map<String, GcInfo> getGcTimes() {
+    return ManagementFactory.getGarbageCollectorMXBeans().stream()
+        .collect(Collectors.toMap(MemoryManagerMXBean::getName, GcInfo::new));
+  }
+
+  static String toString(Map<String, GcInfo> beforeSleep, TimeDuration 
extraSleepTime, Map<String, GcInfo> afterSleep) {
+    final StringBuilder b = new StringBuilder("Detected pause in JVM or host 
machine (eg GC): pause of approximately ")
+        .append(extraSleepTime)
+        .append(System.lineSeparator());
+
+    boolean detected = false;
+    for(Map.Entry<String, GcInfo> before: beforeSleep.entrySet()) {
+      final String name = before.getKey();
+      final GcInfo after = afterSleep.get(name);
+      if (after != null) {
+        final GcInfo diff = after.subtract(before.getValue());
+        if (diff.count != 0) {
+          b.append(System.lineSeparator()).append("GC pool '").append(name)
+              .append("' had collection(s): ").append(diff);
+          detected = true;
+        }
+      }
+    }
+
+    if (!detected) {
+      b.append(System.lineSeparator()).append("No GCs detected");
+    }
+    return b.toString();
+  }
+
+  private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS);
+  private static final TimeDuration WARN_THRESHOLD = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
+
+  private final String name;
+  private final AtomicReference<Thread> threadRef = new AtomicReference<>();
+  private final CheckedConsumer<TimeDuration, IOException> handler;
+
+  public JvmPauseMonitor(Object name, CheckedConsumer<TimeDuration, 
IOException> handler) {
+    this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    this.handler = handler;
+  }
+
+  private void run() {
+    LOG.info("{}: Started", this);
+    try {
+      for (; Thread.currentThread().equals(threadRef.get()); ) {
+        detectPause();
+      }
+    } finally {
+      LOG.info("{}: Stopped", this);
+    }
+  }
+
+  private void detectPause() {
+    final Map<String, GcInfo> before = getGcTimes();
+    final TimeDuration extraSleep;
+    try {
+      extraSleep = SLEEP_TIME.sleep();
+    } catch (InterruptedException ie) {
+      return;
+    }
+
+    if (extraSleep.compareTo(WARN_THRESHOLD) > 0) {
+      final Map<String, GcInfo> after = getGcTimes();
+      LOG.warn("{}: {}", this, toString(before, extraSleep, after));
+    }
+
+    handle(extraSleep);
+  }
+
+  private void handle(TimeDuration extraSleep) {
+    try {
+      handler.accept(extraSleep);
+    } catch (Throwable t) {
+      LOG.error("{}: Failed to handle extra sleep {}", this, extraSleep, t);
+    }
+  }
+
+  /** Start this monitor. */
+  public void start() {
+    final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> new 
Daemon(this::run));
+    Optional.of(threadRef.updateAndGet(previous -> 
Optional.ofNullable(previous).orElseGet(supplier)))
+        .filter(t -> supplier.isInitialized())
+        .ifPresent(Thread::start);
+  }
+
+  /** Stop this monitor. */
+  public void stop() {
+    final Thread previous = threadRef.getAndSet(null);
+    if (previous != null) {
+      previous.interrupt();
+      try {
+        previous.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 1c186ff..54d0dab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -299,15 +299,22 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
     return duration <= 0;
   }
 
-  /** Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}. 
*/
-  public void sleep() throws InterruptedException {
-    sleep(null);
+  /** The same as sleep(null). */
+  public TimeDuration sleep() throws InterruptedException {
+    return sleep(null);
   }
 
-  public void sleep(Consumer<Object> log) throws InterruptedException {
+  /**
+   * Performs a {@link TimeUnit#sleep(long)} using this {@link TimeDuration}.
+   *
+   * @param log If not null, use it to print log messages.
+   * @return the difference of the actual sleep time duration and this {@link 
TimeDuration}.
+   */
+  public TimeDuration sleep(Consumer<Object> log) throws InterruptedException {
     if (log != null) {
       log.accept(StringUtils.stringSupplierAsObject(() -> "Start sleeping " + 
this));
     }
+    final Timestamp start = Timestamp.currentTime();
     try {
       unit.sleep(duration);
       if (log != null) {
@@ -320,6 +327,7 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
       }
       throw ie;
     }
+    return start.elapsedTime().subtract(this);
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java 
b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
index 8ce45ae..ba5fb8c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
@@ -67,6 +67,15 @@ public final class Timestamp implements 
Comparable<Timestamp> {
   }
 
   /**
+   * @param t the time period to be added.
+   * @return a new {@link Timestamp} whose value is calculated
+   *         by adding the given milliseconds to this timestamp.
+   */
+  public Timestamp addTime(TimeDuration t) {
+    return new Timestamp(nanos + t.to(TimeUnit.NANOSECONDS).getDuration());
+  }
+
+  /**
    * @return the elapsed time in milliseconds.
    *         If the timestamp is a future time, the returned value is negative.
    */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java 
b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
deleted file mode 100644
index 9470110..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server;
-
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.thirdparty.com.google.common.base.Joiner;
-import org.apache.ratis.thirdparty.com.google.common.base.Stopwatch;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
-import org.apache.ratis.thirdparty.com.google.common.collect.Maps;
-import org.apache.ratis.thirdparty.com.google.common.collect.Sets;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
-public class JvmPauseMonitor {
-  public static final Logger LOG =
-    LoggerFactory.getLogger(JvmPauseMonitor.class);
-  /**
-   * The target sleep time
-   */
-  private static final long SLEEP_INTERVAL_MS = 500;
-
-  private Thread monitorThread;
-  private volatile boolean shouldRun = true;
-  private final RaftServerProxy proxy;
-
-  private String formatMessage(long extraSleepTime,
-                               Map<String, GcTimes> gcTimesAfterSleep,
-                               Map<String, GcTimes> gcTimesBeforeSleep) {
-
-    Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(),
-      gcTimesBeforeSleep.keySet());
-    List<String> gcDiffs = Lists.newArrayList();
-    for (String name : gcBeanNames) {
-      GcTimes diff = gcTimesAfterSleep.get(name)
-                       .subtract(gcTimesBeforeSleep.get(name));
-      if (diff.gcCount != 0) {
-        gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff);
-      }
-    }
-
-    String ret = "Detected pause in JVM or host machine (eg GC): "
-                   + "pause of approximately " + extraSleepTime + "ms\n";
-    if (gcDiffs.isEmpty()) {
-      ret += "No GCs detected";
-    } else {
-      ret += Joiner.on("\n").join(gcDiffs);
-    }
-    return ret;
-  }
-
-  public JvmPauseMonitor(RaftServerProxy proxy) {
-    this.proxy = proxy;
-  }
-
-  private Map<String, GcTimes> getGcTimes() {
-    Map<String, GcTimes> map = Maps.newHashMap();
-    List<GarbageCollectorMXBean> gcBeans =
-      ManagementFactory.getGarbageCollectorMXBeans();
-    for (GarbageCollectorMXBean gcBean : gcBeans) {
-      map.put(gcBean.getName(), new GcTimes(gcBean));
-    }
-    return map;
-  }
-
-  private static final class GcTimes {
-    private GcTimes(GarbageCollectorMXBean gcBean) {
-      gcCount = gcBean.getCollectionCount();
-      gcTimeMillis = gcBean.getCollectionTime();
-    }
-
-    private GcTimes(long count, long time) {
-      this.gcCount = count;
-      this.gcTimeMillis = time;
-    }
-
-    private GcTimes subtract(GcTimes other) {
-      return new GcTimes(this.gcCount - other.gcCount,
-        this.gcTimeMillis - other.gcTimeMillis);
-    }
-
-    @Override
-    public String toString() {
-      return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
-    }
-
-    private long gcCount;
-    private long gcTimeMillis;
-  }
-
-  class Monitor implements Runnable {
-    private final String name = JvmPauseMonitor.this + "-" + JavaUtils
-                                                               
.getClassSimpleName(getClass());
-
-    public void run() {
-      int leaderStepDownWaitTime =
-        
RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(proxy.getProperties()).toIntExact(TimeUnit
-                                                                               
                        .MILLISECONDS);
-      int rpcSlownessTimeoutMs = 
RaftServerConfigKeys.Rpc.slownessTimeout(proxy.getProperties()).toIntExact(
-        TimeUnit.MILLISECONDS);
-      Stopwatch sw = Stopwatch.createUnstarted();
-      Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
-      LOG.info("Starting Ratis JVM pause monitor");
-      while (shouldRun) {
-        sw.reset().start();
-        try {
-          Thread.sleep(SLEEP_INTERVAL_MS);
-        } catch (InterruptedException ie) {
-          return;
-        }
-        long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - 
SLEEP_INTERVAL_MS;
-        Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
-
-        if (extraSleepTime > 100) {
-          LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, 
gcTimesBeforeSleep));
-        }
-        try {
-          if (extraSleepTime > rpcSlownessTimeoutMs) {
-            // close down all pipelines if the total gc period exceeds
-            // rpc slowness timeout
-            proxy.close();
-          } else if (extraSleepTime > leaderStepDownWaitTime) {
-            proxy.getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
-          }
-        } catch (IOException ioe) {
-          LOG.info("Encountered exception in JvmPauseMonitor {}", ioe);
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return name;
-    }
-  }
-
-
-  public void start() {
-    monitorThread = new Daemon(new Monitor());
-    monitorThread.start();
-  }
-
-  public void stop() {
-    shouldRun = false;
-    if (monitorThread != null) {
-      monitorThread.interrupt();
-      try {
-        monitorThread.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e652408..9543380 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -62,10 +62,10 @@ public interface RaftServerConfigKeys {
   }
 
   String SLEEP_DEVIATION_THRESHOLD_KEY = PREFIX + ".sleep.deviation.threshold";
-  int SLEEP_DEVIATION_THRESHOLD_DEFAULT = 300;
-  static int sleepDeviationThreshold(RaftProperties properties) {
-    return getInt(properties::getInt, SLEEP_DEVIATION_THRESHOLD_KEY,
-        SLEEP_DEVIATION_THRESHOLD_DEFAULT, getDefaultLog());
+  TimeDuration SLEEP_DEVIATION_THRESHOLD_DEFAULT = TimeDuration.valueOf(300, 
TimeUnit.MILLISECONDS);
+  static TimeDuration sleepDeviationThreshold(RaftProperties properties) {
+    return 
getTimeDuration(properties.getTimeDuration(SLEEP_DEVIATION_THRESHOLD_DEFAULT.getUnit()),
+        SLEEP_DEVIATION_THRESHOLD_KEY, SLEEP_DEVIATION_THRESHOLD_DEFAULT, 
getDefaultLog());
   }
   static void setSleepDeviationThreshold(RaftProperties properties, int 
thresholdMs) {
     setInt(properties::setInt, SLEEP_DEVIATION_THRESHOLD_KEY, thresholdMs);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 7c08a0e..b3173bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -109,11 +109,14 @@ class FollowerState extends Daemon {
 
   @Override
   public  void run() {
-    long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
+    final TimeDuration sleepDeviationThreshold = 
server.getSleepDeviationThreshold();
     while (isRunning && server.isFollower()) {
-      final long electionTimeout = server.getRandomTimeoutMs();
+      final TimeDuration electionTimeout = server.getRandomElectionTimeout();
       try {
-        if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
+        final TimeDuration extraSleep = electionTimeout.sleep();
+        if (extraSleep.compareTo(sleepDeviationThreshold) > 0) {
+          LOG.warn("Unexpected long sleep: sleep {} but took extra {} (> 
threshold = {})",
+              electionTimeout, extraSleep, sleepDeviationThreshold);
           continue;
         }
 
@@ -123,10 +126,11 @@ class FollowerState extends Daemon {
           break;
         }
         synchronized (server) {
-          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= 
electionTimeout
+          if (outstandingOp.get() == 0
+              && lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
               && !lostMajorityHeartbeatsRecently()) {
-            LOG.info("{}: change to CANDIDATE, lastRpcTime:{}ms, 
electionTimeout:{}ms",
-                this, lastRpcTime.elapsedTimeMs(), electionTimeout);
+            LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, 
electionTimeout:{}",
+                this, lastRpcTime.elapsedTime(), electionTimeout);
             server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // 
Update timeout metric counters.
             // election timeout, should become a candidate
             server.changeToCandidate();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index dbbefa8..1a31d66 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -285,7 +285,7 @@ class LeaderElection implements Runnable {
 
   private ResultAndTerm waitForResults(final long electionTerm, final int 
submitted,
       RaftConfiguration conf, Executor voteExecutor) throws 
InterruptedException {
-    final Timestamp timeout = 
Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
+    final Timestamp timeout = 
Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
     final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
     final List<Exception> exceptions = new ArrayList<>();
     int waitForNum = submitted;
@@ -336,9 +336,7 @@ class LeaderElection implements Runnable {
 
         // remove higher priority peer, so that we check higherPriorityPeers 
empty to make sure
         // all higher priority peers have replied
-        if (higherPriorityPeers.contains(replierId)) {
-          higherPriorityPeers.remove(replierId);
-        }
+        higherPriorityPeers.remove(replierId);
 
         if (r.getServerReply().getSuccess()) {
           votedPeers.add(replierId);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a1dcc36..1393b0e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -91,7 +91,7 @@ public class RaftServerImpl implements RaftServer.Division,
   private final int maxTimeoutMs;
   private final TimeDuration leaderStepDownWaitTime;
   private final int rpcSlownessTimeoutMs;
-  private final int sleepDeviationThresholdMs;
+  private final TimeDuration sleepDeviationThreshold;
   private final boolean installSnapshotEnabled;
 
   private final LifeCycle lifeCycle;
@@ -128,7 +128,7 @@ public class RaftServerImpl implements RaftServer.Division,
     maxTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
     rpcSlownessTimeoutMs = 
RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
     leaderStepDownWaitTime = 
RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
-    sleepDeviationThresholdMs = 
RaftServerConfigKeys.sleepDeviationThreshold(properties);
+    this.sleepDeviationThreshold = 
RaftServerConfigKeys.sleepDeviationThreshold(properties);
     installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
         "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
@@ -168,16 +168,17 @@ public class RaftServerImpl implements 
RaftServer.Division,
     return rpcSlownessTimeoutMs;
   }
 
-  int getRandomTimeoutMs() {
-    return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - 
minTimeoutMs + 1);
+  TimeDuration getRandomElectionTimeout() {
+    final long millis = minTimeoutMs + 
ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+    return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
   }
 
   TimeDuration getLeaderStepDownWaitTime() {
     return leaderStepDownWaitTime;
   }
 
-  int getSleepDeviationThresholdMs() {
-    return sleepDeviationThresholdMs;
+  TimeDuration getSleepDeviationThreshold() {
+    return sleepDeviationThreshold;
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f1dbe2f..b18b07b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -35,7 +35,7 @@ import 
org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.JvmPauseMonitor;
+import org.apache.ratis.util.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.statemachine.StateMachine;
@@ -44,6 +44,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedFunction;
 
 import java.io.Closeable;
@@ -168,6 +169,7 @@ public class RaftServerProxy implements RaftServer {
 
   private final ImplMap impls = new ImplMap();
   private final ExecutorService implExecutor = 
Executors.newSingleThreadExecutor();
+
   private final JvmPauseMonitor pauseMonitor;
 
   RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
@@ -184,7 +186,20 @@ public class RaftServerProxy implements RaftServer {
     this.lifeCycle = new LifeCycle(this.id + "-" + 
JavaUtils.getClassSimpleName(getClass()));
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, 
parameters).getServerRpc();
-    this.pauseMonitor = new JvmPauseMonitor(this);
+
+    final TimeDuration rpcSlownessTimeout = 
RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+    final TimeDuration leaderStepDownWaitTime = 
RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
+    this.pauseMonitor = new JvmPauseMonitor(id,
+        extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, 
leaderStepDownWaitTime));
+  }
+
+  private void handleJvmPause(TimeDuration extraSleep, TimeDuration 
closeThreshold, TimeDuration stepDownThreshold)
+      throws IOException {
+    if (extraSleep.compareTo(closeThreshold) > 0) {
+      close();
+    } else if (extraSleep.compareTo(stepDownThreshold) > 0) {
+      getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
+    }
   }
 
   /** Check the storage dir and add groups*/

Reply via email to