This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 493e0f10c [CELEBORN-1317][FOLLOWUP] Fix threadDump UT stuck issue
493e0f10c is described below

commit 493e0f10cfae5e1283b726b586684eac13b6f732
Author: Fei Wang <[email protected]>
AuthorDate: Mon May 27 15:12:50 2024 +0800

    [CELEBORN-1317][FOLLOWUP] Fix threadDump UT stuck issue
    
    ### What changes were proposed in this pull request?
    
    Try to fix ApiWorkerResourceSuite::threadDump UT stuck issue.
    1. Using program way to get thread dump.
    
    Related code copied from apache/spark
    
https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/util/Utils.scala
    
https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
    
    ### Why are the changes needed?
    I found that sometimes the UT stuck for threadDump api:
    For example: 
https://github.com/apache/celeborn/actions/runs/8462056188/job/23182806487?pr=2428
    <img width="1291" alt="image" 
src="https://github.com/apache/celeborn/assets/6757692/f39d7bb9-6e31-4ce3-a573-1ff86f335318";>
    
    <img width="762" alt="image" 
src="https://github.com/apache/celeborn/assets/6757692/437592dd-fc9c-404d-a452-834fcf630bd1";>
    
    threadDump api UT is new introduced in 
[CELEBORN-1317](https://issues.apache.org/jira/browse/CELEBORN-1317).
    
    Before there is no UT to cover that, and now it stuck sometimes.
    
    And for getThreadDump, before it leverages processBuilder to get the thread 
info.
    
    I wonder that the process is stuck because of some unknown reason, so, in 
this pr, we try to use program way to get thread info.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    UT.
    
    
![image](https://github.com/apache/celeborn/assets/6757692/51aaa44e-0523-4b60-b6c8-f4e83c709497)
    
    Closes #2429 from turboFei/thread_dump.
    
    Lead-authored-by: Fei Wang <[email protected]>
    Co-authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/common/util/ThreadUtils.scala  | 19 +++++++
 .../org/apache/celeborn/common/util/Utils.scala    | 66 ++++++++++++++++++++--
 .../celeborn/service/deploy/master/Master.scala    |  7 ---
 .../celeborn/server/common/HttpService.scala       |  8 ++-
 .../celeborn/service/deploy/worker/Worker.scala    |  7 ---
 5 files changed, 87 insertions(+), 20 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index 74ce0ca7d..caa3719a4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -401,3 +401,22 @@ class ThreadExceptionHandler(executorService: String)
   override def uncaughtException(t: Thread, e: Throwable): Unit =
     logError(s"Uncaught exception in executor service $executorService, thread 
$t", e)
 }
+
+/**
+ * Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+case class StackTrace(elems: Seq[String]) {
+  override def toString: String = elems.mkString
+}
+
+/**
+ * Note: code was initially copied from Apache Spark(v3.5.1).
+ */
+case class ThreadStackTrace(
+    val threadId: Long,
+    val threadName: String,
+    val threadState: Thread.State,
+    val stackTrace: StackTrace,
+    val blockedByThreadId: Option[Long],
+    val blockedByLock: String,
+    val holdingLocks: Seq[String])
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index c5af4abf8..285ac5991 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.common.util
 
 import java.io._
-import java.lang.management.ManagementFactory
+import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, 
ThreadInfo}
 import java.math.{MathContext, RoundingMode}
 import java.net._
 import java.nio.ByteBuffer
@@ -668,10 +668,66 @@ object Utils extends Logging {
     System.currentTimeMillis - start
   }
 
-  def getThreadDump(): String = {
-    val runtimeMXBean = ManagementFactory.getRuntimeMXBean
-    val pid = runtimeMXBean.getName.split("@")(0)
-    runCommand(s"jstack -l ${pid}")
+  /**
+   * Note: code was initially copied from Apache Spark(v3.5.1).
+   */
+  implicit private class Lock(lock: LockInfo) {
+    def lockString: String = {
+      lock match {
+        case monitor: MonitorInfo => s"Monitor(${monitor.toString})"
+        case _ => s"Lock(${lock.toString})"
+      }
+    }
+  }
+
+  /**
+   * Return a thread dump of all threads' stacktraces.
+   *
+   * <p>Note: code was initially copied from Apache Spark(v3.5.1).
+   */
+  def getThreadDump(): Seq[ThreadStackTrace] = {
+    // We need to filter out null values here because dumpAllThreads() may 
return null array
+    // elements for threads that are dead / don't exist.
+    ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != 
null)
+      .sortWith { case (threadTrace1, threadTrace2) =>
+        val name1 = threadTrace1.getThreadName().toLowerCase(Locale.ROOT)
+        val name2 = threadTrace2.getThreadName().toLowerCase(Locale.ROOT)
+        val nameCmpRes = name1.compareTo(name2)
+        if (nameCmpRes == 0) {
+          threadTrace1.getThreadId < threadTrace2.getThreadId
+        } else {
+          nameCmpRes < 0
+        }
+      }.map(threadInfoToThreadStackTrace)
+  }
+
+  /**
+   * Note: code was initially copied from Apache Spark(v3.5.1).
+   */
+  private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo): 
ThreadStackTrace = {
+    val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame 
-> m).toMap
+    val stackTrace = StackTrace(threadInfo.getStackTrace.map { frame =>
+      monitors.get(frame) match {
+        case Some(monitor) =>
+          monitor.getLockedStackFrame.toString + s" => holding 
${monitor.lockString}"
+        case None =>
+          frame.toString
+      }
+    })
+
+    // use a set to dedup re-entrant locks that are held at multiple places
+    val heldLocks =
+      (threadInfo.getLockedSynchronizers ++ 
threadInfo.getLockedMonitors).map(_.lockString).toSet
+
+    ThreadStackTrace(
+      threadId = threadInfo.getThreadId,
+      threadName = threadInfo.getThreadName,
+      threadState = threadInfo.getThreadState,
+      stackTrace = stackTrace,
+      blockedByThreadId =
+        if (threadInfo.getLockOwnerId < 0) None else 
Some(threadInfo.getLockOwnerId),
+      blockedByLock = 
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
+      holdingLocks = heldLocks.toSeq)
   }
 
   private def readProcessStdout(process: Process): String = {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9fb5d594b..b7eb192e8 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -1224,13 +1224,6 @@ private[celeborn] class Master(
     sb.toString()
   }
 
-  override def getThreadDump: String = {
-    val sb = new StringBuilder
-    sb.append("========================= Master ThreadDump 
==========================\n")
-    sb.append(Utils.getThreadDump()).append("\n")
-    sb.toString()
-  }
-
   override def getHostnameList: String = {
     val sb = new StringBuilder
     sb.append("================= LifecycleManager Hostname List 
======================\n")
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index f11d03092..a41c18466 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -139,7 +139,13 @@ abstract class HttpService extends Service with Logging {
 
   def getWorkerInfo: String
 
-  def getThreadDump: String
+  def getThreadDump: String = {
+    val sb = new StringBuilder
+    sb.append(
+      s"========================= ${serviceName.capitalize} ThreadDump 
==========================\n")
+    sb.append(Utils.getThreadDump().mkString("\n")).append("\n")
+    sb.toString()
+  }
 
   def getShuffleList: String
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f3039fc20..bdaab805c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -755,13 +755,6 @@ private[celeborn] class Worker(
     sb.toString()
   }
 
-  override def getThreadDump: String = {
-    val sb = new StringBuilder
-    sb.append("========================= Worker ThreadDump 
==========================\n")
-    sb.append(Utils.getThreadDump()).append("\n")
-    sb.toString()
-  }
-
   override def getApplicationList: String = {
     val sb = new StringBuilder
     sb.append("================= LifecycleManager Application List 
======================\n")

Reply via email to