This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 493e0f10c [CELEBORN-1317][FOLLOWUP] Fix threadDump UT stuck issue
493e0f10c is described below
commit 493e0f10cfae5e1283b726b586684eac13b6f732
Author: Fei Wang <[email protected]>
AuthorDate: Mon May 27 15:12:50 2024 +0800
[CELEBORN-1317][FOLLOWUP] Fix threadDump UT stuck issue
### What changes were proposed in this pull request?
Try to fix ApiWorkerResourceSuite::threadDump UT stuck issue.
1. Using program way to get thread dump.
Related code copied from apache/spark
https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/util/Utils.scala
https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
### Why are the changes needed?
I found that sometimes the UT stuck for threadDump api:
For example:
https://github.com/apache/celeborn/actions/runs/8462056188/job/23182806487?pr=2428
<img width="1291" alt="image"
src="https://github.com/apache/celeborn/assets/6757692/f39d7bb9-6e31-4ce3-a573-1ff86f335318">
<img width="762" alt="image"
src="https://github.com/apache/celeborn/assets/6757692/437592dd-fc9c-404d-a452-834fcf630bd1">
threadDump api UT is new introduced in
[CELEBORN-1317](https://issues.apache.org/jira/browse/CELEBORN-1317).
Before there is no UT to cover that, and now it stuck sometimes.
And for getThreadDump, before it leverages processBuilder to get the thread
info.
I wonder that the process is stuck because of some unknown reason, so, in
this pr, we try to use program way to get thread info.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.

Closes #2429 from turboFei/thread_dump.
Lead-authored-by: Fei Wang <[email protected]>
Co-authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/common/util/ThreadUtils.scala | 19 +++++++
.../org/apache/celeborn/common/util/Utils.scala | 66 ++++++++++++++++++++--
.../celeborn/service/deploy/master/Master.scala | 7 ---
.../celeborn/server/common/HttpService.scala | 8 ++-
.../celeborn/service/deploy/worker/Worker.scala | 7 ---
5 files changed, 87 insertions(+), 20 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index 74ce0ca7d..caa3719a4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -401,3 +401,22 @@ class ThreadExceptionHandler(executorService: String)
override def uncaughtException(t: Thread, e: Throwable): Unit =
logError(s"Uncaught exception in executor service $executorService, thread
$t", e)
}
+
+/**
+ * Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+case class StackTrace(elems: Seq[String]) {
+ override def toString: String = elems.mkString
+}
+
+/**
+ * Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+case class ThreadStackTrace(
+ val threadId: Long,
+ val threadName: String,
+ val threadState: Thread.State,
+ val stackTrace: StackTrace,
+ val blockedByThreadId: Option[Long],
+ val blockedByLock: String,
+ val holdingLocks: Seq[String])
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index c5af4abf8..285ac5991 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.common.util
import java.io._
-import java.lang.management.ManagementFactory
+import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo,
ThreadInfo}
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.ByteBuffer
@@ -668,10 +668,66 @@ object Utils extends Logging {
System.currentTimeMillis - start
}
- def getThreadDump(): String = {
- val runtimeMXBean = ManagementFactory.getRuntimeMXBean
- val pid = runtimeMXBean.getName.split("@")(0)
- runCommand(s"jstack -l ${pid}")
+ /**
+ * Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+ implicit private class Lock(lock: LockInfo) {
+ def lockString: String = {
+ lock match {
+ case monitor: MonitorInfo => s"Monitor(${monitor.toString})"
+ case _ => s"Lock(${lock.toString})"
+ }
+ }
+ }
+
+ /**
+ * Return a thread dump of all threads' stacktraces.
+ *
+ * <p>Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+ def getThreadDump(): Seq[ThreadStackTrace] = {
+ // We need to filter out null values here because dumpAllThreads() may
return null array
+ // elements for threads that are dead / don't exist.
+ ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ !=
null)
+ .sortWith { case (threadTrace1, threadTrace2) =>
+ val name1 = threadTrace1.getThreadName().toLowerCase(Locale.ROOT)
+ val name2 = threadTrace2.getThreadName().toLowerCase(Locale.ROOT)
+ val nameCmpRes = name1.compareTo(name2)
+ if (nameCmpRes == 0) {
+ threadTrace1.getThreadId < threadTrace2.getThreadId
+ } else {
+ nameCmpRes < 0
+ }
+ }.map(threadInfoToThreadStackTrace)
+ }
+
+ /**
+ * Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+ private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo):
ThreadStackTrace = {
+ val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame
-> m).toMap
+ val stackTrace = StackTrace(threadInfo.getStackTrace.map { frame =>
+ monitors.get(frame) match {
+ case Some(monitor) =>
+ monitor.getLockedStackFrame.toString + s" => holding
${monitor.lockString}"
+ case None =>
+ frame.toString
+ }
+ })
+
+ // use a set to dedup re-entrant locks that are held at multiple places
+ val heldLocks =
+ (threadInfo.getLockedSynchronizers ++
threadInfo.getLockedMonitors).map(_.lockString).toSet
+
+ ThreadStackTrace(
+ threadId = threadInfo.getThreadId,
+ threadName = threadInfo.getThreadName,
+ threadState = threadInfo.getThreadState,
+ stackTrace = stackTrace,
+ blockedByThreadId =
+ if (threadInfo.getLockOwnerId < 0) None else
Some(threadInfo.getLockOwnerId),
+ blockedByLock =
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
+ holdingLocks = heldLocks.toSeq)
}
private def readProcessStdout(process: Process): String = {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9fb5d594b..b7eb192e8 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -1224,13 +1224,6 @@ private[celeborn] class Master(
sb.toString()
}
- override def getThreadDump: String = {
- val sb = new StringBuilder
- sb.append("========================= Master ThreadDump
==========================\n")
- sb.append(Utils.getThreadDump()).append("\n")
- sb.toString()
- }
-
override def getHostnameList: String = {
val sb = new StringBuilder
sb.append("================= LifecycleManager Hostname List
======================\n")
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index f11d03092..a41c18466 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -139,7 +139,13 @@ abstract class HttpService extends Service with Logging {
def getWorkerInfo: String
- def getThreadDump: String
+ def getThreadDump: String = {
+ val sb = new StringBuilder
+ sb.append(
+ s"========================= ${serviceName.capitalize} ThreadDump
==========================\n")
+ sb.append(Utils.getThreadDump().mkString("\n")).append("\n")
+ sb.toString()
+ }
def getShuffleList: String
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f3039fc20..bdaab805c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -755,13 +755,6 @@ private[celeborn] class Worker(
sb.toString()
}
- override def getThreadDump: String = {
- val sb = new StringBuilder
- sb.append("========================= Worker ThreadDump
==========================\n")
- sb.append(Utils.getThreadDump()).append("\n")
- sb.toString()
- }
-
override def getApplicationList: String = {
val sb = new StringBuilder
sb.append("================= LifecycleManager Application List
======================\n")