This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6f538a79d [#1466] feat(server): introduce the JvmPauseMonitor to
detect the gc pause (#1470)
6f538a79d is described below
commit 6f538a79daabd0a065f1e0f19188ea72963b12aa
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jan 19 10:06:53 2024 +0800
[#1466] feat(server): introduce the JvmPauseMonitor to detect the gc pause
(#1470)
### What changes were proposed in this pull request?
Introduce the JvmPauseMonitor to detect the gc pause
### Why are the changes needed?
Fix: #1466
We have some loop check logic in shuffle-server,
sometimes the GC pause will make some pre-allocated buffer removed.
If we have this monitor, we will find out this reason rather than guess.
Although the JVM metrics are also valid.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests
---
.../uniffle/common/util/JvmPauseMonitor.java | 219 +++++++++++++++++++++
.../org/apache/uniffle/server/ShuffleServer.java | 10 +
2 files changed, 229 insertions(+)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java
b/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java
new file mode 100644
index 000000000..59f3b54c0
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+
+/**
+ * Class which sets up a simple thread which runs in a loop sleeping for a
short interval of time.
+ * If the sleep takes significantly longer than its target time, it implies
that the JVM or host
+ * machine has paused processing, which may cause other problems. If such a
pause is detected, the
+ * thread logs a message.
+ */
+public class JvmPauseMonitor implements Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(JvmPauseMonitor.class);
+
+ /** The target sleep time */
+ private static final long SLEEP_INTERVAL_MS = 500;
+
+ /** log WARN if we detect a pause longer than this threshold */
+ private long warnThresholdMs;
+
+ private static final String WARN_THRESHOLD_KEY =
"jvm.pause.warn-threshold.ms";
+ private static final long WARN_THRESHOLD_DEFAULT = 10000;
+
+ /** log INFO if we detect a pause longer than this threshold */
+ private long infoThresholdMs;
+
+ private static final String INFO_THRESHOLD_KEY =
"jvm.pause.info-threshold.ms";
+ private static final long INFO_THRESHOLD_DEFAULT = 1000;
+
+ private long numGcWarnThresholdExceeded = 0;
+ private long numGcInfoThresholdExceeded = 0;
+ private long totalGcExtraSleepTime = 0;
+
+ private Thread monitorThread;
+ private volatile boolean shouldRun = true;
+
+ public JvmPauseMonitor(RssConf rssConf) {
+ this.warnThresholdMs = rssConf.getLong(WARN_THRESHOLD_KEY,
WARN_THRESHOLD_DEFAULT);
+ this.infoThresholdMs = rssConf.getLong(INFO_THRESHOLD_KEY,
INFO_THRESHOLD_DEFAULT);
+ }
+
+ public void start() {
+ monitorThread = new Daemon(new Monitor());
+ monitorThread.start();
+ }
+
+ public boolean isStarted() {
+ return monitorThread != null;
+ }
+
+ public long getNumGcWarnThresholdExceeded() {
+ return numGcWarnThresholdExceeded;
+ }
+
+ public long getNumGcInfoThresholdExceeded() {
+ return numGcInfoThresholdExceeded;
+ }
+
+ public long getTotalGcExtraSleepTime() {
+ return totalGcExtraSleepTime;
+ }
+
+ private String formatMessage(
+ long extraSleepTime,
+ Map<String, GcTimes> gcTimesAfterSleep,
+ Map<String, GcTimes> gcTimesBeforeSleep) {
+
+ Set<String> gcBeanNames =
+ Sets.intersection(gcTimesAfterSleep.keySet(),
gcTimesBeforeSleep.keySet());
+ List<String> gcDiffs = Lists.newArrayList();
+ for (String name : gcBeanNames) {
+ GcTimes diff =
gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name));
+ if (diff.gcCount != 0) {
+ gcDiffs.add("GC pool '" + name + "' had collection(s): " +
diff.toString());
+ }
+ }
+
+ String ret =
+ "Detected pause in JVM or host machine (eg GC): "
+ + "pause of approximately "
+ + extraSleepTime
+ + "ms\n";
+ if (gcDiffs.isEmpty()) {
+ ret += "No GCs detected";
+ } else {
+ ret += StringUtils.join(gcDiffs, "\n");
+ }
+ return ret;
+ }
+
+ private Map<String, GcTimes> getGcTimes() {
+ Map<String, GcTimes> map = new HashMap<>();
+ List<GarbageCollectorMXBean> gcBeans =
ManagementFactory.getGarbageCollectorMXBeans();
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ map.put(gcBean.getName(), new GcTimes(gcBean));
+ }
+ return map;
+ }
+
+ @Override
+ public void close() throws IOException {
+ shouldRun = false;
+ if (monitorThread != null) {
+ monitorThread.interrupt();
+ try {
+ monitorThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static class GcTimes {
+ private GcTimes(GarbageCollectorMXBean gcBean) {
+ gcCount = gcBean.getCollectionCount();
+ gcTimeMillis = gcBean.getCollectionTime();
+ }
+
+ private GcTimes(long count, long time) {
+ this.gcCount = count;
+ this.gcTimeMillis = time;
+ }
+
+ private GcTimes subtract(GcTimes other) {
+ return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis -
other.gcTimeMillis);
+ }
+
+ @Override
+ public String toString() {
+ return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+ }
+
+ private long gcCount;
+ private long gcTimeMillis;
+ }
+
+ private class Monitor implements Runnable {
+ @Override
+ public void run() {
+ StopWatch sw = new StopWatch();
+ Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+ LOG.info("Starting JVM pause monitor");
+ while (shouldRun) {
+ sw.reset().start();
+ try {
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (InterruptedException ie) {
+ return;
+ }
+ long extraSleepTime = sw.now(TimeUnit.MILLISECONDS) -
SLEEP_INTERVAL_MS;
+ Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+ if (extraSleepTime > warnThresholdMs) {
+ ++numGcWarnThresholdExceeded;
+ LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep,
gcTimesBeforeSleep));
+ } else if (extraSleepTime > infoThresholdMs) {
+ ++numGcInfoThresholdExceeded;
+ LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep,
gcTimesBeforeSleep));
+ }
+ totalGcExtraSleepTime += extraSleepTime;
+ gcTimesBeforeSleep = gcTimesAfterSleep;
+ }
+ }
+ }
+
+ /**
+ * Simple 'main' to facilitate manual testing of the pause monitor.
+ *
+ * <p>This main function just leaks memory into a list. Running this class
with a 1GB heap will
+ * very quickly go into "GC hell" and result in log messages about the GC
pauses.
+ *
+ * @param args args.
+ * @throws Exception Exception.
+ */
+ @SuppressWarnings("resource")
+ public static void main(String[] args) throws Exception {
+ JvmPauseMonitor monitor = new JvmPauseMonitor(new RssConf());
+ monitor.start();
+ List<String> list = Lists.newArrayList();
+ int i = 0;
+ while (true) {
+ list.add(String.valueOf(i++));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 9b296c98f..358208097 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -49,6 +49,7 @@ import org.apache.uniffle.common.security.SecurityConfig;
import org.apache.uniffle.common.security.SecurityContextFactory;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.JvmPauseMonitor;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
@@ -99,6 +100,7 @@ public class ShuffleServer {
private Future<?> decommissionFuture;
private boolean nettyServerEnabled;
private StreamServer streamServer;
+ private JvmPauseMonitor jvmPauseMonitor;
public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
@@ -197,11 +199,19 @@ public class ShuffleServer {
if (shuffleTaskManager != null) {
shuffleTaskManager.stop();
}
+ if (jvmPauseMonitor != null) {
+ jvmPauseMonitor.close();
+ }
running = false;
LOG.info("RPC Server Stopped!");
}
private void initialization() throws Exception {
+ // setup jvm pause monitor
+ final JvmPauseMonitor monitor = new JvmPauseMonitor(shuffleServerConf);
+ monitor.start();
+ this.jvmPauseMonitor = monitor;
+
boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
if (!testMode