This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4dfcd9b56 [CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker 
using JVMQuake
4dfcd9b56 is described below

commit 4dfcd9b56b9343594de682a219fd5ae2de7e3fd5
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 28 20:45:08 2023 +0800

    [CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake
    
    ### What changes were proposed in this pull request?
    
    Introduce JVM monitoring in Celeborn Worker using JVMQuake to enable early 
detection of memory management issues and facilitate fast failure.
    
    ### Why are the changes needed?
    
    When facing out-of-control memory management in Celeborn Worker we 
typically use JVMkill as a remedy by killing the process and generating a heap 
dump for post-analysis. However, even with jvmkill protection, we may still 
encounter issues caused by JVM running out of memory, such as repeated 
execution of Full GC without performing any useful work during the pause time. 
Since the JVM does not exhaust 100% of resources, JVMkill will not be 
triggered. Therefore JVMQuake is introduced to  [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `JVMQuakeSuite`
    
    Closes #2061 from SteNicholas/CELEBORN-1092.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 LICENSE-binary                                     |   2 +
 .../org/apache/celeborn/common/CelebornConf.scala  |  86 +++++++++
 .../org/apache/celeborn/common/util/Utils.scala    |   2 +
 dev/dependencies.sh                                |   8 +-
 dev/deps/dependencies-client-flink-1.14            |   1 +
 dev/deps/dependencies-client-flink-1.15            |   1 +
 dev/deps/dependencies-client-flink-1.17            |   1 +
 dev/deps/dependencies-client-flink-1.18            |   1 +
 dev/deps/dependencies-client-mr                    |   1 +
 dev/deps/dependencies-client-spark-2.4             |   1 +
 dev/deps/dependencies-client-spark-3.0             |   1 +
 dev/deps/dependencies-client-spark-3.1             |   1 +
 dev/deps/dependencies-client-spark-3.2             |   1 +
 dev/deps/dependencies-client-spark-3.3             |   1 +
 dev/deps/dependencies-client-spark-3.4             |   1 +
 dev/deps/dependencies-client-spark-3.5             |   1 +
 dev/deps/dependencies-server                       |   1 +
 docs/configuration/worker.md                       |   8 +
 licenses-binary/LISENCE-jdktools.txt               |  21 +++
 pom.xml                                            |   8 +
 project/CelebornBuild.scala                        |   4 +
 project/JDKTools.scala                             |  77 ++++++++
 .../celeborn/service/deploy/worker/Worker.scala    |  10 ++
 .../service/deploy/worker/monitor/JVMQuake.scala   | 197 +++++++++++++++++++++
 .../deploy/worker/monitor/JVMQuakeSuite.scala      |  69 ++++++++
 25 files changed, 501 insertions(+), 4 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 585c150c3..61838e78a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -304,6 +304,8 @@ javax.servlet:javax.servlet-api
 
 MIT License
 ------------
+See license/LISENCE-jdktools.txt for details.
+com.github.olivergondza:maven-jdk-tools-wrapper
 See license/LICENSE-slf4j.txt for details.
 org.slf4j:jul-to-slf4j
 org.slf4j:slf4j-api
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 247727df2..422d0f555 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -689,6 +689,21 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def workerFetchHeartbeatEnabled: Boolean = 
get(WORKER_FETCH_HEARTBEAT_ENABLED)
   def workerPartitionSplitEnabled: Boolean = 
get(WORKER_PARTITION_SPLIT_ENABLED)
   def workerActiveConnectionMax: Option[Long] = 
get(WORKER_ACTIVE_CONNECTION_MAX)
+  def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
+  def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
+  def workerJvmQuakeRuntimeWeight: Double = 
get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
+  def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED)
+  def workerJvmQuakeDumpPath: String = get(WORKER_JVM_QUAKE_DUMP_PATH)
+
+  def workerJvmQuakeDumpThreshold: Duration =
+    getTimeAsMs(
+      WORKER_JVM_QUAKE_DUMP_THRESHOLD.key,
+      WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond
+  def workerJvmQuakeKillThreshold: Duration =
+    getTimeAsMs(
+      WORKER_JVM_QUAKE_KILL_THRESHOLD.key,
+      WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
+  def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)
 
   // //////////////////////////////////////////////////////
   //                 Metrics System                      //
@@ -2896,6 +2911,77 @@ object CelebornConf extends Logging {
       .longConf
       .createOptional
 
+  val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.jvmQuake.enabled")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("When true, Celeborn worker will start the jvm quake to monitor of 
gc behavior, " +
+        "which enables early detection of memory management issues and 
facilitates fast failure.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] =
+    buildConf("celeborn.worker.jvmQuake.check.interval")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("Interval of gc behavior checking for worker jvm quake.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1s")
+
+  val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] =
+    buildConf("celeborn.worker.jvmQuake.runtimeWeight")
+      .categories("worker")
+      .version("0.4.0")
+      .doc(
+        "The factor by which to multiply running JVM time, when weighing it 
against GCing time. " +
+          "'Deficit' is accumulated as `gc_time - runtime * runtime_weight`, 
and is compared against threshold " +
+          "to determine whether to take action.")
+      .doubleConf
+      .createWithDefault(5)
+
+  val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] =
+    buildConf("celeborn.worker.jvmQuake.dump.threshold")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("The threshold of heap dump for the maximum GC 'deficit' which can 
be accumulated before jvmquake takes action. " +
+        "Meanwhile, there is no heap dump generated when dump threshold is 
greater than kill threshold.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("30s")
+
+  val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] =
+    buildConf("celeborn.worker.jvmQuake.kill.threshold")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("The threshold of system kill for the maximum GC 'deficit' which 
can be accumulated before jvmquake takes action.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("60s")
+
+  val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.jvmQuake.dump.enabled")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("Whether to heap dump for the maximum GC 'deficit' during worker 
jvm quake.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val WORKER_JVM_QUAKE_DUMP_PATH: ConfigEntry[String] =
+    buildConf("celeborn.worker.jvmQuake.dump.path")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("The path of heap dump for the maximum GC 'deficit' during worker 
jvm quake.")
+      .stringConf
+      .transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir"))
+        .replace("<pid>", Utils.getProcessId))
+      .createWithDefault(s"<tmp>/jvm-quake/dump/<pid>")
+
+  val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] =
+    buildConf("celeborn.worker.jvmQuake.exitCode")
+      .categories("worker")
+      .version("0.4.0")
+      .doc("The exit code of system kill for the maximum GC 'deficit' during 
worker jvm quake.")
+      .intConf
+      .createWithDefault(502)
+
   val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.client.application.heartbeatInterval")
       .withAlternative("celeborn.application.heartbeatInterval")
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 5704f3f88..47c26ae09 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1091,4 +1091,6 @@ object Utils extends Logging {
     }
     labelPart(0).trim -> labelPart(1).trim
   }
+
+  def getProcessId: String = 
ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
 }
diff --git a/dev/dependencies.sh b/dev/dependencies.sh
index 73c6c41c4..ae44b2e45 100755
--- a/dev/dependencies.sh
+++ b/dev/dependencies.sh
@@ -56,9 +56,9 @@ function mvn_build_classpath() {
 }
 
 function sbt_build_client_classpath() {
-  PATTERN="$SBT_PROJECT / Runtime / externalDependencyClasspath"
+  PATTERN="$SBT_PROJECT / Runtime / managedClasspath"
   deps=$(
-    $SBT -P$MODULE "clean; export Runtime/externalDependencyClasspath" | \
+    $SBT -P$MODULE "clean; export Runtime/managedClasspath" | \
       awk -v pat="$PATTERN" '$0 ~ pat { found=1 } found { print }' | \
       awk 'NR==2' | \
       tr ":" "\n"
@@ -96,8 +96,8 @@ function sbt_build_client_classpath() {
 }
 
 function sbt_build_server_classpath() {
-  $SBT "error; clean; export externalDependencyClasspath" | \
-    awk '/externalDependencyClasspath/ { found=1 } found { print }' | \
+  $SBT "error; clean; export managedClasspath" | \
+    awk '/managedClasspath/ { found=1 } found { print }' | \
     awk 'NR % 2 == 0' | \
     # This will skip the last line 
     sed '$d' | \
diff --git a/dev/deps/dependencies-client-flink-1.14 
b/dev/deps/dependencies-client-flink-1.14
index 456250bb9..be6b7d17f 100644
--- a/dev/deps/dependencies-client-flink-1.14
+++ b/dev/deps/dependencies-client-flink-1.14
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-flink-1.15 
b/dev/deps/dependencies-client-flink-1.15
index 456250bb9..be6b7d17f 100644
--- a/dev/deps/dependencies-client-flink-1.15
+++ b/dev/deps/dependencies-client-flink-1.15
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-flink-1.17 
b/dev/deps/dependencies-client-flink-1.17
index 456250bb9..be6b7d17f 100644
--- a/dev/deps/dependencies-client-flink-1.17
+++ b/dev/deps/dependencies-client-flink-1.17
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-flink-1.18 
b/dev/deps/dependencies-client-flink-1.18
index 456250bb9..be6b7d17f 100644
--- a/dev/deps/dependencies-client-flink-1.18
+++ b/dev/deps/dependencies-client-flink-1.18
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-mr b/dev/deps/dependencies-client-mr
index cc8ef1ade..b3764b2ea 100644
--- a/dev/deps/dependencies-client-mr
+++ b/dev/deps/dependencies-client-mr
@@ -122,6 +122,7 @@ kerby-util/1.0.1//kerby-util-1.0.1.jar
 kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-2.4 
b/dev/deps/dependencies-client-spark-2.4
index c439d7e08..9beaa1f8b 100644
--- a/dev/deps/dependencies-client-spark-2.4
+++ b/dev/deps/dependencies-client-spark-2.4
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.4.0//lz4-java-1.4.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-3.0 
b/dev/deps/dependencies-client-spark-3.0
index 8ad78eccb..03618f785 100644
--- a/dev/deps/dependencies-client-spark-3.0
+++ b/dev/deps/dependencies-client-spark-3.0
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.7.1//lz4-java-1.7.1.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-3.1 
b/dev/deps/dependencies-client-spark-3.1
index 1cbd01b35..da8331b10 100644
--- a/dev/deps/dependencies-client-spark-3.1
+++ b/dev/deps/dependencies-client-spark-3.1
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.7.1//lz4-java-1.7.1.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-3.2 
b/dev/deps/dependencies-client-spark-3.2
index e83df4a71..1feb81a8e 100644
--- a/dev/deps/dependencies-client-spark-3.2
+++ b/dev/deps/dependencies-client-spark-3.2
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.7.1//lz4-java-1.7.1.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-3.3 
b/dev/deps/dependencies-client-spark-3.3
index 456250bb9..be6b7d17f 100644
--- a/dev/deps/dependencies-client-spark-3.3
+++ b/dev/deps/dependencies-client-spark-3.3
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-3.4 
b/dev/deps/dependencies-client-spark-3.4
index 35698f487..eabdee5a8 100644
--- a/dev/deps/dependencies-client-spark-3.4
+++ b/dev/deps/dependencies-client-spark-3.4
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-client-spark-3.5 
b/dev/deps/dependencies-client-spark-3.5
index 481caffcc..4aa62aef5 100644
--- a/dev/deps/dependencies-client-spark-3.5
+++ b/dev/deps/dependencies-client-spark-3.5
@@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 89ac60462..1be1e05dd 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -37,6 +37,7 @@ log4j-api/2.17.2//log4j-api-2.17.2.jar
 log4j-core/2.17.2//log4j-core-2.17.2.jar
 log4j-slf4j-impl/2.17.2//log4j-slf4j-impl-2.17.2.jar
 lz4-java/1.8.0//lz4-java-1.8.0.jar
+maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a40cc9c54..aa0e555de 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -70,6 +70,14 @@ license: |
 | celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful 
shutdown timeout time. | 0.2.0 | 
 | celeborn.worker.http.host | &lt;localhost&gt; | Worker's http host. | 0.4.0 
| 
 | celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 | 
+| celeborn.worker.jvmQuake.check.interval | 1s | Interval of gc behavior 
checking for worker jvm quake. | 0.4.0 | 
+| celeborn.worker.jvmQuake.dump.enabled | true | Whether to heap dump for the 
maximum GC 'deficit' during worker jvm quake. | 0.4.0 | 
+| celeborn.worker.jvmQuake.dump.path | &lt;tmp&gt;/jvm-quake/dump/&lt;pid&gt; 
| The path of heap dump for the maximum GC 'deficit' during worker jvm quake. | 
0.4.0 | 
+| celeborn.worker.jvmQuake.dump.threshold | 30s | The threshold of heap dump 
for the maximum GC 'deficit' which can be accumulated before jvmquake takes 
action. Meanwhile, there is no heap dump generated when dump threshold is 
greater than kill threshold. | 0.4.0 | 
+| celeborn.worker.jvmQuake.enabled | false | When true, Celeborn worker will 
start the jvm quake to monitor of gc behavior, which enables early detection of 
memory management issues and facilitates fast failure. | 0.4.0 | 
+| celeborn.worker.jvmQuake.exitCode | 502 | The exit code of system kill for 
the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | 
+| celeborn.worker.jvmQuake.kill.threshold | 60s | The threshold of system kill 
for the maximum GC 'deficit' which can be accumulated before jvmquake takes 
action. | 0.4.0 | 
+| celeborn.worker.jvmQuake.runtimeWeight | 5.0 | The factor by which to 
multiply running JVM time, when weighing it against GCing time. 'Deficit' is 
accumulated as `gc_time - runtime * runtime_weight`, and is compared against 
threshold to determine whether to take action. | 0.4.0 | 
 | celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device 
monitor to check disk. | 0.3.0 | 
 | celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker 
check device status. | 0.3.0 | 
 | celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type 
for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | 
diff --git a/licenses-binary/LISENCE-jdktools.txt 
b/licenses-binary/LISENCE-jdktools.txt
new file mode 100644
index 000000000..08cc1ab0a
--- /dev/null
+++ b/licenses-binary/LISENCE-jdktools.txt
@@ -0,0 +1,21 @@
+The MIT License
+
+Copyright (c) 2016 Red Hat, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/pom.xml b/pom.xml
index 34dfaa680..e1c96c704 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,7 @@
 
     <!-- for JDK-17 test-->
     <extraJavaTestArgs>-XX:+IgnoreUnrecognizedVMOptions
+      --add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED
       --add-opens=java.base/java.lang=ALL-UNNAMED
       --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
       --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
@@ -1156,6 +1157,13 @@
       <properties>
         <java.version>8</java.version>
       </properties>
+      <dependencies>
+        <dependency>
+          <groupId>com.github.olivergondza</groupId>
+          <artifactId>maven-jdk-tools-wrapper</artifactId>
+          <version>0.1</version>
+        </dependency>
+      </dependencies>
       <build>
         <plugins>
           <!-- Based on 
https://github.com/google/error-prone/blob/f8e33bc460be82ab22256a7ef8b979d7a2cacaba/docs/installation.md#jdk-8
 -->
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index e940702cd..b4f56782a 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -50,6 +50,7 @@ object Dependencies {
   val junitVersion = "4.13.2"
   val leveldbJniVersion = "1.8"
   val log4j2Version = "2.17.2"
+  val jdkToolsVersion = "0.1"
   val metricsVersion = "3.2.6"
   val mockitoVersion = "4.11.0"
   val nettyVersion = "4.1.93.Final"
@@ -71,6 +72,7 @@ object Dependencies {
   val commonsIo = "commons-io" % "commons-io" % commonsIoVersion
   val commonsLang3 = "org.apache.commons" % "commons-lang3" % 
commonsLang3Version
   val commonsLogging = "commons-logging" % "commons-logging" % 
commonsLoggingVersion
+  val jdkTools = "com.github.olivergondza" % "maven-jdk-tools-wrapper" % 
jdkToolsVersion
   val findbugsJsr305 = "com.google.code.findbugs" % "jsr305" % findbugsVersion
   val guava = "com.google.guava" % "guava" % guavaVersion excludeAll(
     ExclusionRule("org.checkerframework", "checker-qual"),
@@ -167,6 +169,7 @@ object CelebornCommonSettings {
     Test / javaOptions ++= Seq(
       "-Xmx4g",
       "-XX:+IgnoreUnrecognizedVMOptions",
+      "--add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED",
       "--add-opens=java.base/java.lang=ALL-UNNAMED",
       "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
       "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
@@ -322,6 +325,7 @@ object CelebornCommon {
         Dependencies.commonsLang3,
         Dependencies.hadoopClientApi,
         Dependencies.hadoopClientRuntime,
+        Dependencies.jdkTools,
         Dependencies.ratisClient,
         Dependencies.ratisCommon,
         Dependencies.leveldbJniAll,
diff --git a/project/JDKTools.scala b/project/JDKTools.scala
new file mode 100644
index 000000000..4dd2a5d61
--- /dev/null
+++ b/project/JDKTools.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.build
+
+import java.io.FileNotFoundException
+
+import sbt._
+import sbt.Keys._
+
+import scala.sys.process._
+
+import scala.util.Try
+
+/**
+ * Represents the main plugin to add tooling settings to projects using 
libraries from the Scala Debugger project.
+ */
+object JDKTools extends AutoPlugin {
+  override def requires = plugins.JvmPlugin
+
+  override def trigger = allRequirements
+
+  override def projectSettings: Seq[Def.Setting[_]] = 
inConfig(Compile)(settings)
+
+  lazy val settings = baseScalaDebuggerToolsSettings
+
+  lazy val baseScalaDebuggerToolsSettings: Seq[Def.Setting[_]] =
+    if (System.getProperty("java.specification.version").startsWith("1."))
+      Seq(
+        // JDK Dependency (just for sbt, must exist on classpath for 
execution, cannot be redistributed)
+        unmanagedJars += {
+          Attributed.blank(JavaTools)
+        }
+      )
+    else
+    // on Java 9+, we don't need to do anything at all
+      Seq()
+
+  //
+  // NOTE: Taken from Ensime Server project (when under BSD 3-clause)
+  // 
https://github.com/ensime/ensime-server/blob/master/project/EnsimeBuild.scala
+  //
+  // WORKAROUND: https://github.com/typelevel/scala/issues/75
+  lazy val JavaTools: File = List(
+    // manual
+    sys.env.get("JAVA_HOME"),
+    sys.env.get("JDK_HOME"),
+    // osx
+    Try("/usr/libexec/java_home".!!).toOption,
+    // fallback
+    // sys.props.get("java.home") returns jre home for JDK8
+    sys.props.get("java.home").map(new File(_).getParent),
+    sys.props.get("java.home")
+  ).flatten.map { n =>
+    new File(n.trim + "/lib/tools.jar")
+  }.find(_.exists()).getOrElse(
+    throw new FileNotFoundException(
+      """
+        |Could not automatically find the JDK/lib/tools.jar.
+        |You must explicitly set JDK_HOME or JAVA_HOME.
+        """.stripMargin)
+  )
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d8c7049e5..ecf867b3e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -51,6 +51,7 @@ import 
org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, 
MemoryManager}
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
+import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
 import 
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, 
StorageManager}
 
 private[celeborn] class Worker(
@@ -275,6 +276,12 @@ private[celeborn] class Worker(
   private val userResourceConsumptions =
     JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, 
Long)]()
 
+  private var jvmQuake: JVMQuake = _
+  if (conf.workerJvmQuakeEnabled) {
+    jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
+    jvmQuake.start()
+  }
+
   workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
     workerInfo.getShuffleKeySet.size
   }
@@ -431,6 +438,9 @@ private[celeborn] class Worker(
     if (!stopped) {
       logInfo("Stopping Worker.")
 
+      if (jvmQuake != null) {
+        jvmQuake.stop()
+      }
       if (sendHeartbeatTask != null) {
         if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
           sendHeartbeatTask.cancel(false)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala
new file mode 100644
index 000000000..a88a39e22
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.monitor
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import com.google.common.annotations.VisibleForTesting
+import com.sun.management.HotSpotDiagnosticMXBean
+import sun.jvmstat.monitor._
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+
+/**
+ * The JVM quake provides granular monitoring of GC behavior, which enables 
early detection of memory management
+ * issues and facilitates fast failure.
+ *
+ * Note: The principle is in alignment with GC instability detection algorithm 
for jvmquake project of Netflix:
+ * https://github.com/Netflix-Skunkworks/jvmquake.
+ *
+ * @param conf Celeborn configuration with jvm quake config.
+ */
+class JVMQuake(conf: CelebornConf, uniqueId: String = 
UUID.randomUUID().toString) extends Logging {
+
+  import JVMQuake._
+
+  val dumpFile = s"worker-quake-heapdump-$uniqueId.hprof"
+  var heapDumped: Boolean = false
+
+  private[this] val enabled = conf.workerJvmQuakeEnabled
+  private[this] val checkInterval = conf.workerJvmQuakeCheckInterval
+  private[this] val runtimeWeight = conf.workerJvmQuakeRuntimeWeight
+  private[this] val dumpThreshold = conf.workerJvmQuakeDumpThreshold.toNanos
+  private[this] val killThreshold = conf.workerJvmQuakeKillThreshold.toNanos
+  private[this] val dumpEnabled = conf.workerJvmQuakeDumpEnabled
+  private[this] val dumpPath = conf.workerJvmQuakeDumpPath
+  private[this] val exitCode = conf.workerJvmQuakeExitCode
+
+  private[this] var lastExitTime: Long = 0L
+  private[this] var lastGCTime: Long = 0L
+  private[this] var bucket: Long = 0L
+  private[this] var scheduler: ScheduledExecutorService = _
+
+  def start(): Unit = {
+    if (enabled) {
+      lastExitTime = getLastExitTime
+      lastGCTime = getLastGCTime
+      scheduler = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake")
+      scheduler.scheduleWithFixedDelay(
+        new Runnable() {
+          override def run(): Unit = {
+            JVMQuake.this.run()
+          }
+        },
+        0,
+        checkInterval,
+        TimeUnit.MILLISECONDS)
+    }
+  }
+
+  def stop(): Unit = {
+    if (enabled) {
+      scheduler.shutdown()
+    }
+  }
+
+  private def run(): Unit = {
+    val currentExitTime = getLastExitTime
+    val currentGCTime = getLastGCTime
+    val gcTime = currentGCTime - lastGCTime
+    val runTime = currentExitTime - lastExitTime - gcTime
+
+    bucket = Math.max(0, bucket + gcTime - BigDecimal(runTime * 
runtimeWeight).toLong)
+    logDebug(s"Time: (gc time: ${Utils.msDurationToString(gcTime)}, execution 
time: ${Utils.msDurationToString(runTime)})")
+    logDebug(
+      s"Capacity: (bucket: $bucket, dump threshold: $dumpThreshold, kill 
threshold: $killThreshold)")
+
+    if (bucket > dumpThreshold) {
+      logError(s"JVM GC has reached the threshold: bucket: $bucket, 
dumpThreshold: $dumpThreshold.")
+      if (shouldHeapDump) {
+        val savePath = getHeapDumpSavePath
+        val linkPath = getHeapDumpLinkPath
+        heapDump(savePath, linkPath)
+      } else if (bucket > killThreshold) {
+        logError(s"Exit JVM with $exitCode. JVM GC has reached the threshold: 
bucket: $bucket, killThreshold: $killThreshold.")
+        System.exit(exitCode)
+      }
+    }
+    lastExitTime = currentExitTime
+    lastGCTime = currentGCTime
+  }
+
+  def shouldHeapDump: Boolean = {
+    dumpEnabled && !heapDumped
+  }
+
+  @VisibleForTesting
+  def getHeapDumpSavePath: String =
+    dumpPath
+
+  @VisibleForTesting
+  def getHeapDumpLinkPath: String =
+    s"${new File(dumpPath).getParent}/link/${Utils.getProcessId}"
+
+  private def heapDump(savePath: String, linkPath: String, live: Boolean = 
false): Unit = {
+    val saveDir = new File(savePath)
+    if (!saveDir.exists()) {
+      saveDir.mkdirs()
+    }
+    val heapDump = new File(saveDir, dumpFile)
+    if (heapDump.exists()) {
+      // Each worker process only generates one heap dump. Skip when heap dump 
of worker already exists.
+      logWarning(s"Skip because heap dump of worker already exists: 
$heapDump.")
+      heapDumped = true
+      return
+    }
+    logInfo(s"Starting heap dump: $heapDump.")
+    ManagementFactory.newPlatformMXBeanProxy(
+      ManagementFactory.getPlatformMBeanServer,
+      "com.sun.management:type=HotSpotDiagnostic",
+      classOf[HotSpotDiagnosticMXBean]).dumpHeap(heapDump.getAbsolutePath, 
live)
+    val linkDir = new File(linkPath)
+    if (linkDir.exists()) {
+      // Each worker process only generates one heap dump. Skip when symbolic 
link of heap dump exists.
+      logWarning(s"Skip because symbolic link of heap dump exists: $linkPath.")
+    } else if (!linkDir.getParentFile.exists()) {
+      linkDir.getParentFile.mkdirs()
+    }
+    try {
+      Files.createSymbolicLink(linkDir.toPath, saveDir.toPath)
+      logInfo(s"Created symbolic link: $linkPath.")
+    } catch {
+      case e: Exception =>
+        logError("Create symbolic link failed.", e)
+    } finally {
+      heapDumped = true
+      logInfo(s"Finished heap dump: $dumpFile.")
+    }
+  }
+}
+
+object JVMQuake {
+
+  private[this] var quake: JVMQuake = _
+
+  def create(conf: CelebornConf, uniqueId: String): JVMQuake = {
+    set(new JVMQuake(conf, uniqueId))
+    quake
+  }
+
+  def get: JVMQuake = {
+    quake
+  }
+
+  def set(quake: JVMQuake): Unit = {
+    this.quake = quake
+  }
+
+  private[this] lazy val monitoredVm: MonitoredVm = {
+    val host = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost"))
+    host.getMonitoredVm(new 
VmIdentifier("local://%s@localhost".format(Utils.getProcessId)))
+  }
+
+  private[this] lazy val ygcExitTimeMonitor: Monitor =
+    monitoredVm.findByName("sun.gc.collector.0.lastExitTime")
+  private[this] lazy val fgcExitTimeMonitor: Monitor =
+    monitoredVm.findByName("sun.gc.collector.1.lastExitTime")
+  private[this] lazy val ygcTimeMonitor: Monitor = 
monitoredVm.findByName("sun.gc.collector.0.time")
+  private[this] lazy val fgcTimeMonitor: Monitor = 
monitoredVm.findByName("sun.gc.collector.1.time")
+
+  private def getLastExitTime: Long = Math.max(
+    ygcExitTimeMonitor.getValue.asInstanceOf[Long],
+    fgcExitTimeMonitor.getValue.asInstanceOf[Long])
+
+  private def getLastGCTime: Long =
+    ygcTimeMonitor.getValue.asInstanceOf[Long] + 
fgcTimeMonitor.getValue.asInstanceOf[Long]
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala
new file mode 100644
index 000000000..7ef49f225
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.monitor
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.junit.Assert.assertTrue
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf._
+import org.apache.celeborn.common.util.JavaUtils
+
+class JVMQuakeSuite extends CelebornFunSuite {
+
+  private val allocation = new ArrayBuffer[Array[Byte]]()
+
+  override def afterEach(): Unit = {
+    allocation.clear()
+    System.gc()
+  }
+
+  test("[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using 
JVMQuake") {
+    val quake = new JVMQuake(new 
CelebornConf().set(WORKER_JVM_QUAKE_ENABLED.key, "true")
+      .set(WORKER_JVM_QUAKE_RUNTIME_WEIGHT.key, "1")
+      .set(WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, "1s")
+      .set(WORKER_JVM_QUAKE_KILL_THRESHOLD.key, "2s"))
+    quake.start()
+    allocateMemory(quake)
+    quake.stop()
+
+    assertTrue(quake.heapDumped)
+    val heapDump = new File(s"${quake.getHeapDumpSavePath}/${quake.dumpFile}")
+    assert(heapDump.exists())
+    JavaUtils.deleteRecursively(heapDump)
+    JavaUtils.deleteRecursively(new File(quake.getHeapDumpLinkPath))
+  }
+
+  def allocateMemory(quake: JVMQuake): Unit = {
+    val capacity = 1024 * 100
+    while (allocation.size * capacity < Runtime.getRuntime.maxMemory / 4) {
+      val bytes = new Array[Byte](capacity)
+      allocation.append(bytes)
+    }
+    while (quake.shouldHeapDump) {
+      for (index <- allocation.indices) {
+        val bytes = new Array[Byte](capacity)
+        allocation(index) = bytes
+      }
+    }
+  }
+}


Reply via email to