This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 73cf1562f [CELEBORN-1299] Introduce JVM profiling in Celeborn Worker 
using async-profiler
73cf1562f is described below

commit 73cf1562f729f1c8035eb53e3896d4a4e5132a06
Author: SteNicholas <[email protected]>
AuthorDate: Mon Mar 25 14:05:50 2024 +0800

    [CELEBORN-1299] Introduce JVM profiling in Celeborn Worker using 
async-profiler
    
    ### What changes were proposed in this pull request?
    
    Introduce JVM profiling `JVMProfier` in Celeborn Worker using 
async-profiler to capture CPU and memory profiles.
    
    ### Why are the changes needed?
    
    [async-profiler](https://github.com/async-profiler) is a sampling profiler 
for any JDK based on the HotSpot JVM that does not suffer from Safepoint bias 
problem. It has low overhead and doesn’t rely on JVMTI. It avoids the safepoint 
bias problem by using the `AsyncGetCallTrace` API provided by HotSpot JVM to 
profile the Java code paths, and Linux’s perf_events to profile the native code 
paths. It features HotSpot-specific APIs to collect stack traces and to track 
memory allocations.
    The feature introduces a profier plugin that does not add any overhead 
unless enabled and can be configured to accept profiler arguments as a 
configuration parameter. It should support to turn profiling on/off, includes 
the jar/binaries needed for profiling.
    
    Backport [[SPARK-46094] Support Executor JVM 
Profiling](https://github.com/apache/spark/pull/44021).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Worker cluster test.
    
    Closes #2409 from SteNicholas/CELEBORN-1299.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 28 ++++++++
 dev/deps/dependencies-server                       |  1 +
 docs/configuration/worker.md                       |  3 +
 docs/developers/jvmprofiler.md                     | 56 +++++++++++++++
 mkdocs.yml                                         |  1 +
 pom.xml                                            |  7 ++
 project/CelebornBuild.scala                        |  3 +
 worker/pom.xml                                     |  4 ++
 .../celeborn/service/deploy/worker/Worker.scala    | 11 +++
 .../deploy/worker/profiler/JVMProfiler.scala       | 80 ++++++++++++++++++++++
 10 files changed, 194 insertions(+)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 69b3a0421..9a7751a75 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -715,6 +715,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerFetchHeartbeatEnabled: Boolean = 
get(WORKER_FETCH_HEARTBEAT_ENABLED)
   def workerPartitionSplitEnabled: Boolean = 
get(WORKER_PARTITION_SPLIT_ENABLED)
   def workerActiveConnectionMax: Option[Long] = 
get(WORKER_ACTIVE_CONNECTION_MAX)
+  def workerJvmProfilerEnabled: Boolean = get(WORKER_JVM_PROFILER_ENABLED)
+  def workerJvmProfilerOptions: String = get(WORKER_JVM_PROFILER_OPTIONS)
+  def workerJvmProfilerLocalDir: String = get(WORKER_JVM_PROFILER_LOCAL_DIR)
   def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
   def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
   def workerJvmQuakeRuntimeWeight: Double = 
get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
@@ -3121,6 +3124,31 @@ object CelebornConf extends Logging {
       .longConf
       .createOptional
 
+  val WORKER_JVM_PROFILER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.jvmProfiler.enabled")
+      .categories("worker")
+      .version("0.5.0")
+      .doc("Turn on code profiling via async_profiler in workers.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val WORKER_JVM_PROFILER_OPTIONS: ConfigEntry[String] =
+    buildConf("celeborn.worker.jvmProfiler.options")
+      .categories("worker")
+      .version("0.5.0")
+      .doc("Options to pass on to the async profiler.")
+      .stringConf
+      
.createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s")
+
+  val WORKER_JVM_PROFILER_LOCAL_DIR: ConfigEntry[String] =
+    buildConf("celeborn.worker.jvmProfiler.localDir")
+      .categories("worker")
+      .version("0.5.0")
+      .doc("Local file system path on worker where profiler output is saved. "
+        + "Defaults to the working directory of the worker process.")
+      .stringConf
+      .createWithDefault(".")
+
   val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.jvmQuake.enabled")
       .categories("worker")
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index fbb995def..c0f7f0a26 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -17,6 +17,7 @@
 
 HikariCP/4.0.3//HikariCP-4.0.3.jar
 RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
+ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
 commons-cli/1.5.0//commons-cli-1.5.0.jar
 commons-crypto/1.0.0//commons-crypto-1.0.0.jar
 commons-io/2.13.0//commons-io-2.13.0.jar
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 784d6b89c..68ad3f494 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -85,6 +85,9 @@ license: |
 | celeborn.worker.http.host | &lt;localhost&gt; | false | Worker's http host. 
| 0.4.0 | 
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host 
| 
 | celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | 
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port 
| 
 | celeborn.worker.internal.port | 0 | false | Internal server port on the 
Worker where the master nodes connect. | 0.5.0 |  | 
+| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling 
via async_profiler in workers. | 0.5.0 |  | 
+| celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on 
worker where profiler output is saved. Defaults to the working directory of the 
worker process. | 0.5.0 |  | 
+| celeborn.worker.jvmProfiler.options | 
event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s | false | Options to 
pass on to the async profiler. | 0.5.0 |  | 
 | celeborn.worker.jvmQuake.check.interval | 1s | false | Interval of gc 
behavior checking for worker jvm quake. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.dump.enabled | true | false | Whether to heap dump 
for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.dump.path | &lt;tmp&gt;/jvm-quake/dump/&lt;pid&gt; 
| false | The path of heap dump for the maximum GC 'deficit' during worker jvm 
quake. | 0.4.0 |  | 
diff --git a/docs/developers/jvmprofiler.md b/docs/developers/jvmprofiler.md
new file mode 100644
index 000000000..b0f745940
--- /dev/null
+++ b/docs/developers/jvmprofiler.md
@@ -0,0 +1,56 @@
+---
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+# JVM Profiler
+Since version 0.5.0, Celeborn supports JVM sampling profiler to capture CPU 
and memory profiles. This article provides a detailed guide of Celeborn 
`Worker`'s code profiling.
+
+## Worker Code Profiling
+The JVM profiler enables code profiling of workers based on the [async 
profiler](https://github.com/async-profiler/async-profiler/blob/v2.10/README.md),
 a low overhead sampling profiler.
+This allows a `Worker` instance to capture CPU and memory profiles for 
`Worker` which is later analyzed for performance issues. 
+The profiler captures [Java Flight Recorder 
(jfr)](https://access.redhat.com/documentation/es-es/red_hat_build_of_openjdk/17/html/using_jdk_flight_recorder_with_red_hat_build_of_openjdk/openjdk-flight-recorded-overview)
 files for each worker that can be read by tools like Java Mission Control and 
Intellij etc.
+The profiler writes the jfr files to the `Worker`'s working directory in the 
`Worker`'s local file system and the files can grow to be large,
+so it is advisable that the `Worker` machines have adequate storage.
+
+Code profiling is currently only supported for
+
+*   Linux (x64)
+*   Linux (arm 64)
+*   Linux (musl, x64)
+*   MacOS
+
+To get maximum profiling information set the following jvm options for the 
`Worker` :
+
+```
+-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints 
-XX:+PreserveFramePointer
+```
+
+For more information on async_profiler see the [Async Profiler 
Manual](https://krzysztofslusarski.github.io/2022/12/12/async-manual.html).
+
+To enable code profiling, enable the code profiling in the configuration.
+
+```properties
+celeborn.worker.jvmProfiler.enabled true
+```
+
+For more configuration of code profiling refer to 
`celeborn.worker.jvmProfiler.*`.
+
+### Profiling Configuration Example
+```properties
+celeborn.worker.jvmProfiler.enabled true
+celeborn.worker.jvmProfiler.options 
event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s
+```
diff --git a/mkdocs.yml b/mkdocs.yml
index 02223f416..cc4adfe49 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -92,6 +92,7 @@ nav:
         - Overview: developers/worker.md
         - Storage: developers/storage.md
         - Traffic Control: developers/trafficcontrol.md
+        - JVM Profiler: developers/jvmprofiler.md
       - Client:
         - Overview: developers/client.md
         - LifecycleManager: developers/lifecyclemanager.md
diff --git a/pom.xml b/pom.xml
index 6257b4254..5876863d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,7 @@
     <rocksdbjni.version>8.11.3</rocksdbjni.version>
     <jackson.version>2.15.3</jackson.version>
     <snappy.version>1.1.10.5</snappy.version>
+    <ap.loader.version>3.0-8</ap.loader.version>
 
     <!-- Db dependencies -->
     <mybatis.version>3.5.15</mybatis.version>
@@ -430,6 +431,12 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <!-- async-profiler loader contains async_profiler binaries for multiple 
platforms -->
+      <dependency>
+        <groupId>me.bechberger</groupId>
+        <artifactId>ap-loader-all</artifactId>
+        <version>${ap.loader.version}</version>
+      </dependency>
 
       <!-- Db dependencies -->
       <dependency>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 5bbd04c29..fc7052a36 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -36,6 +36,7 @@ object Dependencies {
   val lz4JavaVersion = 
sparkClientProjects.map(_.lz4JavaVersion).getOrElse("1.8.0")
 
   // Dependent library versions
+  val apLoaderVersion = "3.0-8"
   val commonsCompressVersion = "1.4.1"
   val commonsCryptoVersion = "1.0.0"
   val commonsIoVersion = "2.13.0"
@@ -71,6 +72,7 @@ object Dependencies {
   val protocVersion = "3.21.7"
   val protoVersion = "3.21.7"
 
+  val apLoader = "me.bechberger" % "ap-loader-all" % apLoaderVersion
   val commonsCompress = "org.apache.commons" % "commons-compress" % 
commonsCompressVersion
   val commonsCrypto = "org.apache.commons" % "commons-crypto" % 
commonsCryptoVersion excludeAll(
     ExclusionRule("net.java.dev.jna", "jna"))
@@ -495,6 +497,7 @@ object CelebornWorker {
         ExclusionRule("org.apache.ratis", "ratis-client")
       ),
       libraryDependencies ++= Seq(
+        Dependencies.apLoader,
         Dependencies.guava,
         Dependencies.commonsIo,
         Dependencies.ioNetty,
diff --git a/worker/pom.xml b/worker/pom.xml
index 5a8914e3f..c4a7c913d 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -80,6 +80,10 @@
       <groupId>org.rocksdb</groupId>
       <artifactId>rocksdbjni</artifactId>
     </dependency>
+    <dependency>
+      <groupId>me.bechberger</groupId>
+      <artifactId>ap-loader-all</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.celeborn</groupId>
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index aeb87bb98..5ec378060 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -25,6 +25,7 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import com.google.common.annotations.VisibleForTesting
 import io.netty.util.HashedWheelTimer
@@ -57,6 +58,7 @@ import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionContro
 import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, 
MemoryManager}
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
 import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
+import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
 import 
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, 
StorageManager}
 
 private[celeborn] class Worker(
@@ -326,6 +328,12 @@ private[celeborn] class Worker(
   var cleaner: ExecutorService =
     ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner")
 
+  private var jvmProfiler: JVMProfiler = _
+  if (conf.workerJvmProfilerEnabled) {
+    jvmProfiler = new JVMProfiler(conf)
+    jvmProfiler.start()
+  }
+
   private var jvmQuake: JVMQuake = _
   if (conf.workerJvmQuakeEnabled) {
     jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
@@ -500,6 +508,9 @@ private[celeborn] class Worker(
     if (!stopped) {
       logInfo("Stopping Worker.")
 
+      if (jvmProfiler != null) {
+        jvmProfiler.stop()
+      }
       if (jvmQuake != null) {
         jvmQuake.stop()
       }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala
new file mode 100644
index 000000000..3aa275388
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.profiler
+
+import java.io.IOException
+
+import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+
+/**
+ * The JVM profiler provides code profiling of worker based on the the async 
profiler, a low overhead sampling profiler for Java.
+ * This allows a worker to capture CPU and memory profiles for worker which 
can later be analyzed for performance issues.
+ * The profiler captures Java Flight Recorder (jfr) files for each worker read 
by tools including Java Mission Control and Intellij.
+ *
+ * <p> Note: The profiler writes the jfr files to the worker's working 
directory in the worker's local file system and the files can grow to be large 
so it is advisable
+ * that the worker machines have adequate storage.
+ *
+ * <p>Note: code copied from Apache Spark.
+ *
+ * @param conf Celeborn configuration with jvm profiler config.
+ */
+class JVMProfiler(conf: CelebornConf) extends Logging {
+
+  private var running = false
+  private val enableProfiler = conf.workerJvmProfilerEnabled
+  private val profilerOptions = conf.workerJvmProfilerOptions
+  private val profilerLocalDir = conf.workerJvmProfilerLocalDir
+
+  private val startcmd = 
s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+  private val stopcmd = 
s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+
+  val profiler: Option[AsyncProfiler] = {
+    Option(
+      if (enableProfiler && AsyncProfilerLoader.isSupported) 
AsyncProfilerLoader.load() else null)
+  }
+
+  def start(): Unit = {
+    if (!running) {
+      try {
+        profiler.foreach(p => {
+          p.execute(startcmd)
+          logInfo("JVM profiling started.")
+          running = true
+        })
+      } catch {
+        case e @ (_: IllegalArgumentException | _: IllegalStateException | _: 
IOException) =>
+          logError("JVM profiling aborted. Exception occurred in profiler 
native code: ", e)
+        case e: Exception => logWarning("JVM profiling aborted due to 
exception: ", e)
+      }
+    }
+  }
+
+  /** Stops the profiling and saves output to dfs location. */
+  def stop(): Unit = {
+    if (running) {
+      profiler.foreach(p => {
+        p.execute(stopcmd)
+        logInfo("JVM profiler stopped.")
+        running = false
+      })
+    }
+  }
+}

Reply via email to