This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9be3f02a0 RATIS-1877. In JvmPauseMonitor, sleepTime should not be
larger than sleepDeviationThreshold (#907)
9be3f02a0 is described below
commit 9be3f02a0f06e96347839555b7b2ce1cd5b9f384
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun Aug 27 04:56:28 2023 -0700
RATIS-1877. In JvmPauseMonitor, sleepTime should not be larger than
sleepDeviationThreshold (#907)
---
.../org/apache/ratis/util/JvmPauseMonitor.java | 55 +++++++++++++++++++---
.../java/org/apache/ratis/util/TimeDuration.java | 9 ++++
.../apache/ratis/server/impl/RaftServerProxy.java | 8 ++--
3 files changed, 63 insertions(+), 9 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
index fac3d0ede..2e84c1122 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -32,8 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-public class JvmPauseMonitor {
- public static final Logger LOG =
LoggerFactory.getLogger(JvmPauseMonitor.class);
+/**
+ * Detect pauses in JVM causing by GC or other problems in the machine.
+ */
+public final class JvmPauseMonitor {
+ static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
static final class GcInfo {
@@ -87,16 +90,54 @@ public class JvmPauseMonitor {
+ extraSleepTime.toString(TimeUnit.SECONDS, 3) + gc;
}
- private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500,
TimeUnit.MILLISECONDS);
+ /** To build {@link JvmPauseMonitor}. */
+ public static class Builder {
+ private Object name = "default";
+ private TimeDuration sleepDeviationThreshold = TimeDuration.valueOf(300,
TimeUnit.MILLISECONDS);
+ private TimeDuration sleepTime = sleepDeviationThreshold;
+ private CheckedConsumer<TimeDuration, IOException> handler = t -> {};
+
+ public Builder setName(Object name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder setSleepTime(TimeDuration sleepTime) {
+ this.sleepTime = sleepTime;
+ return this;
+ }
+
+ public Builder setSleepDeviationThreshold(TimeDuration
sleepDeviationThreshold) {
+ this.sleepDeviationThreshold = sleepDeviationThreshold;
+ return this;
+ }
+
+ public Builder setHandler(CheckedConsumer<TimeDuration, IOException>
handler) {
+ this.handler = handler;
+ return this;
+ }
+
+ public JvmPauseMonitor build() {
+ return new JvmPauseMonitor(name, sleepTime, sleepDeviationThreshold,
handler);
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private final TimeDuration sleepTime;
private final TimeDuration sleepDeviationThreshold;
private final String name;
private final AtomicReference<Thread> threadRef = new AtomicReference<>();
private final CheckedConsumer<TimeDuration, IOException> handler;
- public JvmPauseMonitor(Object name, TimeDuration sleepDeviationThreshold,
+ private JvmPauseMonitor(Object name, TimeDuration sleepTime, TimeDuration
sleepDeviationThreshold,
CheckedConsumer<TimeDuration, IOException> handler) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+ // use min -- if the sleep time is too long, it may not be able to detect
the given deviation.
+ this.sleepTime = TimeDuration.min(sleepTime, sleepDeviationThreshold);
this.sleepDeviationThreshold = sleepDeviationThreshold;
this.handler = handler;
}
@@ -116,7 +157,7 @@ public class JvmPauseMonitor {
final Map<String, GcInfo> before = getGcTimes();
final TimeDuration extraSleep;
try {
- extraSleep = SLEEP_TIME.sleep();
+ extraSleep = sleepTime.sleep();
} catch (InterruptedException ie) {
return;
}
@@ -140,7 +181,9 @@ public class JvmPauseMonitor {
/** Start this monitor. */
public void start() {
final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() ->
Daemon.newBuilder()
- .setName("JvmPauseMonitor" +
THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build());
+ .setName(JavaUtils.getClassSimpleName(getClass()) +
THREAD_COUNT.getAndIncrement())
+ .setRunnable(this::run)
+ .build());
Optional.of(threadRef.updateAndGet(previous ->
Optional.ofNullable(previous).orElseGet(supplier)))
.filter(t -> supplier.isInitialized())
.ifPresent(Thread::start);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 98fb3694e..10c5c6bd1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -29,6 +29,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
+import java.util.stream.Stream;
/**
* Time duration is represented by a long together with a {@link TimeUnit}.
@@ -59,6 +60,14 @@ public final class TimeDuration implements
Comparable<TimeDuration> {
return ordinal == timeUnits.length - 1? unit: timeUnits[ordinal + 1];
}
+ /** @return the minimum of the given parameters. */
+ public static TimeDuration min(TimeDuration left, TimeDuration right) {
+ Objects.requireNonNull(left, "left == null");
+ Objects.requireNonNull(right, "right == null");
+ return Stream.of(left, right).min(TimeDuration::compareTo).orElseThrow(
+ () -> new IllegalStateException("Failed to compute min(" + left + ", "
+ right + ")"));
+ }
+
/** Abbreviations of {@link TimeUnit}. */
public enum Abbreviation {
NANOSECONDS("ns", "nanos"),
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 89f49210e..f93120b3a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -74,7 +74,7 @@ import java.util.stream.Stream;
class RaftServerProxy implements RaftServer {
/**
* A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
- *
+ * <p>
* The map is synchronized for mutations and the bulk {@link
#getGroupIds()}/{@link #getAll()} methods
* but the (non-bulk) {@link #get(RaftGroupId)} and {@link
#containsGroup(RaftGroupId)} methods are not.
* The thread safety and atomicity guarantees for the non-bulk methods are
provided by {@link ConcurrentMap}.
@@ -222,8 +222,10 @@ class RaftServerProxy implements RaftServer {
final TimeDuration sleepDeviationThreshold =
RaftServerConfigKeys.sleepDeviationThreshold(properties);
final TimeDuration rpcSlownessTimeout =
RaftServerConfigKeys.Rpc.slownessTimeout(properties);
final TimeDuration leaderStepDownWaitTime =
RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
- this.pauseMonitor = new JvmPauseMonitor(id, sleepDeviationThreshold,
- extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout,
leaderStepDownWaitTime));
+ this.pauseMonitor = JvmPauseMonitor.newBuilder().setName(id)
+ .setSleepDeviationThreshold(sleepDeviationThreshold)
+ .setHandler(extraSleep -> handleJvmPause(extraSleep,
rpcSlownessTimeout, leaderStepDownWaitTime))
+ .build();
this.threadGroup = threadGroup == null ? new
ThreadGroup(this.id.toString()) : threadGroup;
}