This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f538a79d [#1466] feat(server): introduce the JvmPauseMonitor to 
detect the gc pause (#1470)
6f538a79d is described below

commit 6f538a79daabd0a065f1e0f19188ea72963b12aa
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jan 19 10:06:53 2024 +0800

    [#1466] feat(server): introduce the JvmPauseMonitor to detect the gc pause 
(#1470)
    
    ### What changes were proposed in this pull request?
    
    Introduce the JvmPauseMonitor to detect the gc pause
    
    ### Why are the changes needed?
    
    Fix: #1466
    
    We have some loop check logic in shuffle-server,
    sometimes the GC pause will make some pre-allocated buffer  removed.
    If we have this monitor, we will find out this reason rather than guess.
    Although the JVM metrics are also valid.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests
---
 .../uniffle/common/util/JvmPauseMonitor.java       | 219 +++++++++++++++++++++
 .../org/apache/uniffle/server/ShuffleServer.java   |  10 +
 2 files changed, 229 insertions(+)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java 
b/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java
new file mode 100644
index 000000000..59f3b54c0
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssConf;
+
+/**
+ * Class which sets up a simple thread which runs in a loop sleeping for a 
short interval of time.
+ * If the sleep takes significantly longer than its target time, it implies 
that the JVM or host
+ * machine has paused processing, which may cause other problems. If such a 
pause is detected, the
+ * thread logs a message.
+ */
+public class JvmPauseMonitor implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JvmPauseMonitor.class);
+
+  /** The target sleep time */
+  private static final long SLEEP_INTERVAL_MS = 500;
+
+  /** log WARN if we detect a pause longer than this threshold */
+  private long warnThresholdMs;
+
+  private static final String WARN_THRESHOLD_KEY = 
"jvm.pause.warn-threshold.ms";
+  private static final long WARN_THRESHOLD_DEFAULT = 10000;
+
+  /** log INFO if we detect a pause longer than this threshold */
+  private long infoThresholdMs;
+
+  private static final String INFO_THRESHOLD_KEY = 
"jvm.pause.info-threshold.ms";
+  private static final long INFO_THRESHOLD_DEFAULT = 1000;
+
+  private long numGcWarnThresholdExceeded = 0;
+  private long numGcInfoThresholdExceeded = 0;
+  private long totalGcExtraSleepTime = 0;
+
+  private Thread monitorThread;
+  private volatile boolean shouldRun = true;
+
+  public JvmPauseMonitor(RssConf rssConf) {
+    this.warnThresholdMs = rssConf.getLong(WARN_THRESHOLD_KEY, 
WARN_THRESHOLD_DEFAULT);
+    this.infoThresholdMs = rssConf.getLong(INFO_THRESHOLD_KEY, 
INFO_THRESHOLD_DEFAULT);
+  }
+
+  public void start() {
+    monitorThread = new Daemon(new Monitor());
+    monitorThread.start();
+  }
+
+  public boolean isStarted() {
+    return monitorThread != null;
+  }
+
+  public long getNumGcWarnThresholdExceeded() {
+    return numGcWarnThresholdExceeded;
+  }
+
+  public long getNumGcInfoThresholdExceeded() {
+    return numGcInfoThresholdExceeded;
+  }
+
+  public long getTotalGcExtraSleepTime() {
+    return totalGcExtraSleepTime;
+  }
+
+  private String formatMessage(
+      long extraSleepTime,
+      Map<String, GcTimes> gcTimesAfterSleep,
+      Map<String, GcTimes> gcTimesBeforeSleep) {
+
+    Set<String> gcBeanNames =
+        Sets.intersection(gcTimesAfterSleep.keySet(), 
gcTimesBeforeSleep.keySet());
+    List<String> gcDiffs = Lists.newArrayList();
+    for (String name : gcBeanNames) {
+      GcTimes diff = 
gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name));
+      if (diff.gcCount != 0) {
+        gcDiffs.add("GC pool '" + name + "' had collection(s): " + 
diff.toString());
+      }
+    }
+
+    String ret =
+        "Detected pause in JVM or host machine (eg GC): "
+            + "pause of approximately "
+            + extraSleepTime
+            + "ms\n";
+    if (gcDiffs.isEmpty()) {
+      ret += "No GCs detected";
+    } else {
+      ret += StringUtils.join(gcDiffs, "\n");
+    }
+    return ret;
+  }
+
+  private Map<String, GcTimes> getGcTimes() {
+    Map<String, GcTimes> map = new HashMap<>();
+    List<GarbageCollectorMXBean> gcBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      map.put(gcBean.getName(), new GcTimes(gcBean));
+    }
+    return map;
+  }
+
+  @Override
+  public void close() throws IOException {
+    shouldRun = false;
+    if (monitorThread != null) {
+      monitorThread.interrupt();
+      try {
+        monitorThread.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static class GcTimes {
+    private GcTimes(GarbageCollectorMXBean gcBean) {
+      gcCount = gcBean.getCollectionCount();
+      gcTimeMillis = gcBean.getCollectionTime();
+    }
+
+    private GcTimes(long count, long time) {
+      this.gcCount = count;
+      this.gcTimeMillis = time;
+    }
+
+    private GcTimes subtract(GcTimes other) {
+      return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis - 
other.gcTimeMillis);
+    }
+
+    @Override
+    public String toString() {
+      return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+    }
+
+    private long gcCount;
+    private long gcTimeMillis;
+  }
+
+  private class Monitor implements Runnable {
+    @Override
+    public void run() {
+      StopWatch sw = new StopWatch();
+      Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+      LOG.info("Starting JVM pause monitor");
+      while (shouldRun) {
+        sw.reset().start();
+        try {
+          Thread.sleep(SLEEP_INTERVAL_MS);
+        } catch (InterruptedException ie) {
+          return;
+        }
+        long extraSleepTime = sw.now(TimeUnit.MILLISECONDS) - 
SLEEP_INTERVAL_MS;
+        Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+        if (extraSleepTime > warnThresholdMs) {
+          ++numGcWarnThresholdExceeded;
+          LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, 
gcTimesBeforeSleep));
+        } else if (extraSleepTime > infoThresholdMs) {
+          ++numGcInfoThresholdExceeded;
+          LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep, 
gcTimesBeforeSleep));
+        }
+        totalGcExtraSleepTime += extraSleepTime;
+        gcTimesBeforeSleep = gcTimesAfterSleep;
+      }
+    }
+  }
+
+  /**
+   * Simple 'main' to facilitate manual testing of the pause monitor.
+   *
+   * <p>This main function just leaks memory into a list. Running this class 
with a 1GB heap will
+   * very quickly go into "GC hell" and result in log messages about the GC 
pauses.
+   *
+   * @param args args.
+   * @throws Exception Exception.
+   */
+  @SuppressWarnings("resource")
+  public static void main(String[] args) throws Exception {
+    JvmPauseMonitor monitor = new JvmPauseMonitor(new RssConf());
+    monitor.start();
+    List<String> list = Lists.newArrayList();
+    int i = 0;
+    while (true) {
+      list.add(String.valueOf(i++));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 9b296c98f..358208097 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -49,6 +49,7 @@ import org.apache.uniffle.common.security.SecurityConfig;
 import org.apache.uniffle.common.security.SecurityContextFactory;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.JvmPauseMonitor;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
@@ -99,6 +100,7 @@ public class ShuffleServer {
   private Future<?> decommissionFuture;
   private boolean nettyServerEnabled;
   private StreamServer streamServer;
+  private JvmPauseMonitor jvmPauseMonitor;
 
   public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
     this.shuffleServerConf = shuffleServerConf;
@@ -197,11 +199,19 @@ public class ShuffleServer {
     if (shuffleTaskManager != null) {
       shuffleTaskManager.stop();
     }
+    if (jvmPauseMonitor != null) {
+      jvmPauseMonitor.close();
+    }
     running = false;
     LOG.info("RPC Server Stopped!");
   }
 
   private void initialization() throws Exception {
+    // setup jvm pause monitor
+    final JvmPauseMonitor monitor = new JvmPauseMonitor(shuffleServerConf);
+    monitor.start();
+    this.jvmPauseMonitor = monitor;
+
     boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
     String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
     if (!testMode

Reply via email to