This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 73cf1562f [CELEBORN-1299] Introduce JVM profiling in Celeborn Worker
using async-profiler
73cf1562f is described below
commit 73cf1562f729f1c8035eb53e3896d4a4e5132a06
Author: SteNicholas <[email protected]>
AuthorDate: Mon Mar 25 14:05:50 2024 +0800
[CELEBORN-1299] Introduce JVM profiling in Celeborn Worker using
async-profiler
### What changes were proposed in this pull request?
Introduce JVM profiling `JVMProfier` in Celeborn Worker using
async-profiler to capture CPU and memory profiles.
### Why are the changes needed?
[async-profiler](https://github.com/async-profiler) is a sampling profiler
for any JDK based on the HotSpot JVM that does not suffer from Safepoint bias
problem. It has low overhead and doesn’t rely on JVMTI. It avoids the safepoint
bias problem by using the `AsyncGetCallTrace` API provided by HotSpot JVM to
profile the Java code paths, and Linux’s perf_events to profile the native code
paths. It features HotSpot-specific APIs to collect stack traces and to track
memory allocations.
The feature introduces a profier plugin that does not add any overhead
unless enabled and can be configured to accept profiler arguments as a
configuration parameter. It should support to turn profiling on/off, includes
the jar/binaries needed for profiling.
Backport [[SPARK-46094] Support Executor JVM
Profiling](https://github.com/apache/spark/pull/44021).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Worker cluster test.
Closes #2409 from SteNicholas/CELEBORN-1299.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 28 ++++++++
dev/deps/dependencies-server | 1 +
docs/configuration/worker.md | 3 +
docs/developers/jvmprofiler.md | 56 +++++++++++++++
mkdocs.yml | 1 +
pom.xml | 7 ++
project/CelebornBuild.scala | 3 +
worker/pom.xml | 4 ++
.../celeborn/service/deploy/worker/Worker.scala | 11 +++
.../deploy/worker/profiler/JVMProfiler.scala | 80 ++++++++++++++++++++++
10 files changed, 194 insertions(+)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 69b3a0421..9a7751a75 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -715,6 +715,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerFetchHeartbeatEnabled: Boolean =
get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean =
get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] =
get(WORKER_ACTIVE_CONNECTION_MAX)
+ def workerJvmProfilerEnabled: Boolean = get(WORKER_JVM_PROFILER_ENABLED)
+ def workerJvmProfilerOptions: String = get(WORKER_JVM_PROFILER_OPTIONS)
+ def workerJvmProfilerLocalDir: String = get(WORKER_JVM_PROFILER_LOCAL_DIR)
def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
def workerJvmQuakeRuntimeWeight: Double =
get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
@@ -3121,6 +3124,31 @@ object CelebornConf extends Logging {
.longConf
.createOptional
+ val WORKER_JVM_PROFILER_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.jvmProfiler.enabled")
+ .categories("worker")
+ .version("0.5.0")
+ .doc("Turn on code profiling via async_profiler in workers.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val WORKER_JVM_PROFILER_OPTIONS: ConfigEntry[String] =
+ buildConf("celeborn.worker.jvmProfiler.options")
+ .categories("worker")
+ .version("0.5.0")
+ .doc("Options to pass on to the async profiler.")
+ .stringConf
+
.createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s")
+
+ val WORKER_JVM_PROFILER_LOCAL_DIR: ConfigEntry[String] =
+ buildConf("celeborn.worker.jvmProfiler.localDir")
+ .categories("worker")
+ .version("0.5.0")
+ .doc("Local file system path on worker where profiler output is saved. "
+ + "Defaults to the working directory of the worker process.")
+ .stringConf
+ .createWithDefault(".")
+
val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmQuake.enabled")
.categories("worker")
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index fbb995def..c0f7f0a26 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -17,6 +17,7 @@
HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
+ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
commons-io/2.13.0//commons-io-2.13.0.jar
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 784d6b89c..68ad3f494 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -85,6 +85,9 @@ license: |
| celeborn.worker.http.host | <localhost> | false | Worker's http host.
| 0.4.0 |
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host
|
| celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 |
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port
|
| celeborn.worker.internal.port | 0 | false | Internal server port on the
Worker where the master nodes connect. | 0.5.0 | |
+| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling
via async_profiler in workers. | 0.5.0 | |
+| celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on
worker where profiler output is saved. Defaults to the working directory of the
worker process. | 0.5.0 | |
+| celeborn.worker.jvmProfiler.options |
event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s | false | Options to
pass on to the async profiler. | 0.5.0 | |
| celeborn.worker.jvmQuake.check.interval | 1s | false | Interval of gc
behavior checking for worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.dump.enabled | true | false | Whether to heap dump
for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.dump.path | <tmp>/jvm-quake/dump/<pid>
| false | The path of heap dump for the maximum GC 'deficit' during worker jvm
quake. | 0.4.0 | |
diff --git a/docs/developers/jvmprofiler.md b/docs/developers/jvmprofiler.md
new file mode 100644
index 000000000..b0f745940
--- /dev/null
+++ b/docs/developers/jvmprofiler.md
@@ -0,0 +1,56 @@
+---
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+# JVM Profiler
+Since version 0.5.0, Celeborn supports JVM sampling profiler to capture CPU
and memory profiles. This article provides a detailed guide of Celeborn
`Worker`'s code profiling.
+
+## Worker Code Profiling
+The JVM profiler enables code profiling of workers based on the [async
profiler](https://github.com/async-profiler/async-profiler/blob/v2.10/README.md),
a low overhead sampling profiler.
+This allows a `Worker` instance to capture CPU and memory profiles for
`Worker` which is later analyzed for performance issues.
+The profiler captures [Java Flight Recorder
(jfr)](https://access.redhat.com/documentation/es-es/red_hat_build_of_openjdk/17/html/using_jdk_flight_recorder_with_red_hat_build_of_openjdk/openjdk-flight-recorded-overview)
files for each worker that can be read by tools like Java Mission Control and
Intellij etc.
+The profiler writes the jfr files to the `Worker`'s working directory in the
`Worker`'s local file system and the files can grow to be large,
+so it is advisable that the `Worker` machines have adequate storage.
+
+Code profiling is currently only supported for
+
+* Linux (x64)
+* Linux (arm 64)
+* Linux (musl, x64)
+* MacOS
+
+To get maximum profiling information set the following jvm options for the
`Worker` :
+
+```
+-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints
-XX:+PreserveFramePointer
+```
+
+For more information on async_profiler see the [Async Profiler
Manual](https://krzysztofslusarski.github.io/2022/12/12/async-manual.html).
+
+To enable code profiling, enable the code profiling in the configuration.
+
+```properties
+celeborn.worker.jvmProfiler.enabled true
+```
+
+For more configuration of code profiling refer to
`celeborn.worker.jvmProfiler.*`.
+
+### Profiling Configuration Example
+```properties
+celeborn.worker.jvmProfiler.enabled true
+celeborn.worker.jvmProfiler.options
event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s
+```
diff --git a/mkdocs.yml b/mkdocs.yml
index 02223f416..cc4adfe49 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -92,6 +92,7 @@ nav:
- Overview: developers/worker.md
- Storage: developers/storage.md
- Traffic Control: developers/trafficcontrol.md
+ - JVM Profiler: developers/jvmprofiler.md
- Client:
- Overview: developers/client.md
- LifecycleManager: developers/lifecyclemanager.md
diff --git a/pom.xml b/pom.xml
index 6257b4254..5876863d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,7 @@
<rocksdbjni.version>8.11.3</rocksdbjni.version>
<jackson.version>2.15.3</jackson.version>
<snappy.version>1.1.10.5</snappy.version>
+ <ap.loader.version>3.0-8</ap.loader.version>
<!-- Db dependencies -->
<mybatis.version>3.5.15</mybatis.version>
@@ -430,6 +431,12 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- async-profiler loader contains async_profiler binaries for multiple
platforms -->
+ <dependency>
+ <groupId>me.bechberger</groupId>
+ <artifactId>ap-loader-all</artifactId>
+ <version>${ap.loader.version}</version>
+ </dependency>
<!-- Db dependencies -->
<dependency>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 5bbd04c29..fc7052a36 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -36,6 +36,7 @@ object Dependencies {
val lz4JavaVersion =
sparkClientProjects.map(_.lz4JavaVersion).getOrElse("1.8.0")
// Dependent library versions
+ val apLoaderVersion = "3.0-8"
val commonsCompressVersion = "1.4.1"
val commonsCryptoVersion = "1.0.0"
val commonsIoVersion = "2.13.0"
@@ -71,6 +72,7 @@ object Dependencies {
val protocVersion = "3.21.7"
val protoVersion = "3.21.7"
+ val apLoader = "me.bechberger" % "ap-loader-all" % apLoaderVersion
val commonsCompress = "org.apache.commons" % "commons-compress" %
commonsCompressVersion
val commonsCrypto = "org.apache.commons" % "commons-crypto" %
commonsCryptoVersion excludeAll(
ExclusionRule("net.java.dev.jna", "jna"))
@@ -495,6 +497,7 @@ object CelebornWorker {
ExclusionRule("org.apache.ratis", "ratis-client")
),
libraryDependencies ++= Seq(
+ Dependencies.apLoader,
Dependencies.guava,
Dependencies.commonsIo,
Dependencies.ioNetty,
diff --git a/worker/pom.xml b/worker/pom.xml
index 5a8914e3f..c4a7c913d 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -80,6 +80,10 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
+ <dependency>
+ <groupId>me.bechberger</groupId>
+ <artifactId>ap-loader-all</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index aeb87bb98..5ec378060 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -25,6 +25,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
import scala.collection.JavaConverters._
+import scala.util.Random
import com.google.common.annotations.VisibleForTesting
import io.netty.util.HashedWheelTimer
@@ -57,6 +58,7 @@ import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionContro
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter,
MemoryManager}
import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
+import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
import
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter,
StorageManager}
private[celeborn] class Worker(
@@ -326,6 +328,12 @@ private[celeborn] class Worker(
var cleaner: ExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner")
+ private var jvmProfiler: JVMProfiler = _
+ if (conf.workerJvmProfilerEnabled) {
+ jvmProfiler = new JVMProfiler(conf)
+ jvmProfiler.start()
+ }
+
private var jvmQuake: JVMQuake = _
if (conf.workerJvmQuakeEnabled) {
jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
@@ -500,6 +508,9 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")
+ if (jvmProfiler != null) {
+ jvmProfiler.stop()
+ }
if (jvmQuake != null) {
jvmQuake.stop()
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala
new file mode 100644
index 000000000..3aa275388
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/profiler/JVMProfiler.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.profiler
+
+import java.io.IOException
+
+import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+
+/**
+ * The JVM profiler provides code profiling of worker based on the the async
profiler, a low overhead sampling profiler for Java.
+ * This allows a worker to capture CPU and memory profiles for worker which
can later be analyzed for performance issues.
+ * The profiler captures Java Flight Recorder (jfr) files for each worker read
by tools including Java Mission Control and Intellij.
+ *
+ * <p> Note: The profiler writes the jfr files to the worker's working
directory in the worker's local file system and the files can grow to be large
so it is advisable
+ * that the worker machines have adequate storage.
+ *
+ * <p>Note: code copied from Apache Spark.
+ *
+ * @param conf Celeborn configuration with jvm profiler config.
+ */
+class JVMProfiler(conf: CelebornConf) extends Logging {
+
+ private var running = false
+ private val enableProfiler = conf.workerJvmProfilerEnabled
+ private val profilerOptions = conf.workerJvmProfilerOptions
+ private val profilerLocalDir = conf.workerJvmProfilerLocalDir
+
+ private val startcmd =
s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+ private val stopcmd =
s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
+
+ val profiler: Option[AsyncProfiler] = {
+ Option(
+ if (enableProfiler && AsyncProfilerLoader.isSupported)
AsyncProfilerLoader.load() else null)
+ }
+
+ def start(): Unit = {
+ if (!running) {
+ try {
+ profiler.foreach(p => {
+ p.execute(startcmd)
+ logInfo("JVM profiling started.")
+ running = true
+ })
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IllegalStateException | _:
IOException) =>
+ logError("JVM profiling aborted. Exception occurred in profiler
native code: ", e)
+ case e: Exception => logWarning("JVM profiling aborted due to
exception: ", e)
+ }
+ }
+ }
+
+ /** Stops the profiling and saves output to dfs location. */
+ def stop(): Unit = {
+ if (running) {
+ profiler.foreach(p => {
+ p.execute(stopcmd)
+ logInfo("JVM profiler stopped.")
+ running = false
+ })
+ }
+ }
+}