This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 54bbd72bd [CELEBORN-1697] Improve ThreadStackTrace for thread dump
54bbd72bd is described below
commit 54bbd72bd237c876c5a075408f43dfe1f13e2768
Author: SteNicholas <[email protected]>
AuthorDate: Thu Nov 7 20:36:58 2024 +0800
[CELEBORN-1697] Improve ThreadStackTrace for thread dump
### What changes were proposed in this pull request?
Improve `ThreadStackTrace` with `synchronizers`, `monitors`, `lockName`,
`lockOwnerName`, `suspended`, `inNative` for thread dump.
### Why are the changes needed?
ThreadStackTrace does not support stack trace including `synchronizers`,
`monitors`, `lockName`, `lockOwnerName`, `suspended`, `inNative` at present.
It's recommend to improve `ThreadStackTrace` of thread dump for more details of
thread stack trace.
### Does this PR introduce _any_ user-facing change?
The response of `ThreadStack` in `/api/v1/thread_dump` adds
`synchronizers`, `monitors`, `lockName`, `lockOwnerName`, `suspended`,
`inNative` fields.
Cherry pick:
- https://github.com/apache/spark/pull/42575
- https://github.com/apache/spark/pull/43095
### How was this patch tested?
`ApiV1BaseResourceSuite#thread_dump`
Closes #2888 from SteNicholas/CELEBORN-1697.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/common/util/ThreadUtils.scala | 48 +++-
.../org/apache/celeborn/common/util/Utils.scala | 50 ++--
.../apache/celeborn/rest/v1/model/ThreadStack.java | 270 ++++++++++++++++++++-
.../src/main/openapi3/master_rest_v1.yaml | 22 ++
.../src/main/openapi3/worker_rest_v1.yaml | 22 ++
.../common/http/api/v1/ApiV1BaseResource.scala | 6 +
.../http/api/v1/ApiV1BaseResourceSuite.scala | 1 +
7 files changed, 389 insertions(+), 30 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index dd5e74140..c8b6491b2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -424,10 +424,44 @@ case class StackTrace(elems: Seq[String]) {
* Note: code was initially copied from Apache Spark(v3.5.1).
*/
case class ThreadStackTrace(
- val threadId: Long,
- val threadName: String,
- val threadState: Thread.State,
- val stackTrace: StackTrace,
- val blockedByThreadId: Option[Long],
- val blockedByLock: String,
- val holdingLocks: Seq[String])
+ threadId: Long,
+ threadName: String,
+ threadState: Thread.State,
+ stackTrace: StackTrace,
+ blockedByThreadId: Option[Long],
+ blockedByLock: String,
+ holdingLocks: Seq[String],
+ synchronizers: Seq[String],
+ monitors: Seq[String],
+ lockName: Option[String],
+ lockOwnerName: Option[String],
+ suspended: Boolean,
+ inNative: Boolean) {
+
+ /**
+ * Returns a string representation of this thread stack trace w.r.t
java.lang.management.ThreadInfo(JDK 8)'s toString.
+ *
+ * TODO(SPARK-44896): Also considering adding information os_prio, cpu,
elapsed, tid, nid, etc., from the jstack tool
+ */
+ override def toString: String = {
+ val sb = new StringBuilder(
+ s""""$threadName" Id=$threadId $threadState""")
+ lockName.foreach(lock => sb.append(s" on $lock"))
+ lockOwnerName.foreach {
+ owner => sb.append(s"""owned by "$owner"""")
+ }
+ blockedByThreadId.foreach(id => s" Id=$id")
+ if (suspended) sb.append(" (suspended)")
+ if (inNative) sb.append(" (in native)")
+ sb.append('\n')
+
+ sb.append(stackTrace.elems.map(e => s"\tat $e").mkString)
+
+ if (synchronizers.nonEmpty) {
+ sb.append(s"\n\tNumber of locked synchronizers =
${synchronizers.length}\n")
+ synchronizers.foreach(sync => sb.append(s"\t- $sync\n"))
+ }
+ sb.append('\n')
+ sb.toString
+ }
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index dc5b6ea34..7c8f16846 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -766,29 +766,39 @@ object Utils extends Logging {
* Note: code was initially copied from Apache Spark(v3.5.1).
*/
private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo):
ThreadStackTrace = {
- val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame
-> m).toMap
- val stackTrace = StackTrace(threadInfo.getStackTrace.map { frame =>
- monitors.get(frame) match {
- case Some(monitor) =>
- monitor.getLockedStackFrame.toString + s" => holding
${monitor.lockString}"
- case None =>
- frame.toString
- }
+ val threadState = threadInfo.getThreadState
+ val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackDepth
-> m.toString).toMap
+ val stackTrace = StackTrace(threadInfo.getStackTrace.zipWithIndex.map {
case (frame, idx) =>
+ val locked =
+ if (idx == 0 && threadInfo.getLockInfo != null) {
+ threadState match {
+ case Thread.State.BLOCKED =>
+ s"\t- blocked on ${threadInfo.getLockInfo}\n"
+ case Thread.State.WAITING | Thread.State.TIMED_WAITING =>
+ s"\t- waiting on ${threadInfo.getLockInfo}\n"
+ case _ => ""
+ }
+ } else ""
+ val locking = monitors.get(idx).map(mi => s"\t- locked
$mi\n").getOrElse("")
+ s"${frame.toString}\n$locked$locking"
})
- // use a set to dedup re-entrant locks that are held at multiple places
- val heldLocks =
- (threadInfo.getLockedSynchronizers ++
threadInfo.getLockedMonitors).map(_.lockString).toSet
-
+ val synchronizers = threadInfo.getLockedSynchronizers.map(_.toString)
+ val monitorStrs = monitors.values.toSeq
ThreadStackTrace(
- threadId = threadInfo.getThreadId,
- threadName = threadInfo.getThreadName,
- threadState = threadInfo.getThreadState,
- stackTrace = stackTrace,
- blockedByThreadId =
- if (threadInfo.getLockOwnerId < 0) None else
Some(threadInfo.getLockOwnerId),
- blockedByLock =
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
- holdingLocks = heldLocks.toSeq)
+ threadInfo.getThreadId,
+ threadInfo.getThreadName,
+ threadState,
+ stackTrace,
+ if (threadInfo.getLockOwnerId < 0) None else
Some(threadInfo.getLockOwnerId),
+ Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
+ synchronizers ++ monitorStrs,
+ synchronizers,
+ monitorStrs,
+ Option(threadInfo.getLockName),
+ Option(threadInfo.getLockOwnerName),
+ threadInfo.isSuspended,
+ threadInfo.isInNative)
}
private def readProcessStdout(process: Process): String = {
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java
index dbcdd2241..ff133a23c 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java
@@ -41,7 +41,15 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
ThreadStack.JSON_PROPERTY_STACK_TRACE,
ThreadStack.JSON_PROPERTY_BLOCKED_BY_THREAD_ID,
ThreadStack.JSON_PROPERTY_BLOCKED_BY_LOCK,
- ThreadStack.JSON_PROPERTY_HOLDING_LOCKS
+ ThreadStack.JSON_PROPERTY_HOLDING_LOCKS,
+ ThreadStack.JSON_PROPERTY_SYNCHRONIZERS,
+ ThreadStack.JSON_PROPERTY_MONITORS,
+ ThreadStack.JSON_PROPERTY_LOCK_NAME,
+ ThreadStack.JSON_PROPERTY_LOCK_OWNER_NAME,
+ ThreadStack.JSON_PROPERTY_SUSPENDED,
+ ThreadStack.JSON_PROPERTY_IN_NATIVE,
+ ThreadStack.JSON_PROPERTY_IS_DAEMON,
+ ThreadStack.JSON_PROPERTY_PRIORITY
})
@javax.annotation.Generated(value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
public class ThreadStack {
@@ -66,6 +74,30 @@ public class ThreadStack {
public static final String JSON_PROPERTY_HOLDING_LOCKS = "holdingLocks";
private List<String> holdingLocks = new ArrayList<>();
+ public static final String JSON_PROPERTY_SYNCHRONIZERS = "synchronizers";
+ private List<String> synchronizers = new ArrayList<>();
+
+ public static final String JSON_PROPERTY_MONITORS = "monitors";
+ private List<String> monitors = new ArrayList<>();
+
+ public static final String JSON_PROPERTY_LOCK_NAME = "lockName";
+ private String lockName;
+
+ public static final String JSON_PROPERTY_LOCK_OWNER_NAME = "lockOwnerName";
+ private String lockOwnerName;
+
+ public static final String JSON_PROPERTY_SUSPENDED = "suspended";
+ private Boolean suspended;
+
+ public static final String JSON_PROPERTY_IN_NATIVE = "inNative";
+ private Boolean inNative;
+
+ public static final String JSON_PROPERTY_IS_DAEMON = "isDaemon";
+ private Boolean isDaemon;
+
+ public static final String JSON_PROPERTY_PRIORITY = "priority";
+ private Integer priority;
+
public ThreadStack() {
}
@@ -260,6 +292,222 @@ public class ThreadStack {
this.holdingLocks = holdingLocks;
}
+ public ThreadStack synchronizers(List<String> synchronizers) {
+
+ this.synchronizers = synchronizers;
+ return this;
+ }
+
+ public ThreadStack addSynchronizersItem(String synchronizersItem) {
+ if (this.synchronizers == null) {
+ this.synchronizers = new ArrayList<>();
+ }
+ this.synchronizers.add(synchronizersItem);
+ return this;
+ }
+
+ /**
+ * The ownable synchronizers locked by the thread.
+ * @return synchronizers
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_SYNCHRONIZERS)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public List<String> getSynchronizers() {
+ return synchronizers;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_SYNCHRONIZERS)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setSynchronizers(List<String> synchronizers) {
+ this.synchronizers = synchronizers;
+ }
+
+ public ThreadStack monitors(List<String> monitors) {
+
+ this.monitors = monitors;
+ return this;
+ }
+
+ public ThreadStack addMonitorsItem(String monitorsItem) {
+ if (this.monitors == null) {
+ this.monitors = new ArrayList<>();
+ }
+ this.monitors.add(monitorsItem);
+ return this;
+ }
+
+ /**
+ * The object monitors locked by the thread.
+ * @return monitors
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_MONITORS)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public List<String> getMonitors() {
+ return monitors;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_MONITORS)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setMonitors(List<String> monitors) {
+ this.monitors = monitors;
+ }
+
+ public ThreadStack lockName(String lockName) {
+
+ this.lockName = lockName;
+ return this;
+ }
+
+ /**
+ * The string representation of the object on which the thread is blocked if
any.
+ * @return lockName
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_LOCK_NAME)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public String getLockName() {
+ return lockName;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_LOCK_NAME)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setLockName(String lockName) {
+ this.lockName = lockName;
+ }
+
+ public ThreadStack lockOwnerName(String lockOwnerName) {
+
+ this.lockOwnerName = lockOwnerName;
+ return this;
+ }
+
+ /**
+ * The name of the thread that owns the object this thread is blocked on.
+ * @return lockOwnerName
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_LOCK_OWNER_NAME)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public String getLockOwnerName() {
+ return lockOwnerName;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_LOCK_OWNER_NAME)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setLockOwnerName(String lockOwnerName) {
+ this.lockOwnerName = lockOwnerName;
+ }
+
+ public ThreadStack suspended(Boolean suspended) {
+
+ this.suspended = suspended;
+ return this;
+ }
+
+ /**
+ * Whether the thread is suspended.
+ * @return suspended
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_SUSPENDED)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Boolean getSuspended() {
+ return suspended;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_SUSPENDED)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setSuspended(Boolean suspended) {
+ this.suspended = suspended;
+ }
+
+ public ThreadStack inNative(Boolean inNative) {
+
+ this.inNative = inNative;
+ return this;
+ }
+
+ /**
+ * Whether the thread is executing native code.
+ * @return inNative
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_IN_NATIVE)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Boolean getInNative() {
+ return inNative;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_IN_NATIVE)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setInNative(Boolean inNative) {
+ this.inNative = inNative;
+ }
+
+ public ThreadStack isDaemon(Boolean isDaemon) {
+
+ this.isDaemon = isDaemon;
+ return this;
+ }
+
+ /**
+ * Whether the thread is a daemon thread.
+ * @return isDaemon
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_IS_DAEMON)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Boolean getIsDaemon() {
+ return isDaemon;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_IS_DAEMON)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setIsDaemon(Boolean isDaemon) {
+ this.isDaemon = isDaemon;
+ }
+
+ public ThreadStack priority(Integer priority) {
+
+ this.priority = priority;
+ return this;
+ }
+
+ /**
+ * The priority of the thread.
+ * @return priority
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_PRIORITY)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_PRIORITY)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setPriority(Integer priority) {
+ this.priority = priority;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -275,12 +523,20 @@ public class ThreadStack {
Objects.equals(this.stackTrace, threadStack.stackTrace) &&
Objects.equals(this.blockedByThreadId, threadStack.blockedByThreadId)
&&
Objects.equals(this.blockedByLock, threadStack.blockedByLock) &&
- Objects.equals(this.holdingLocks, threadStack.holdingLocks);
+ Objects.equals(this.holdingLocks, threadStack.holdingLocks) &&
+ Objects.equals(this.synchronizers, threadStack.synchronizers) &&
+ Objects.equals(this.monitors, threadStack.monitors) &&
+ Objects.equals(this.lockName, threadStack.lockName) &&
+ Objects.equals(this.lockOwnerName, threadStack.lockOwnerName) &&
+ Objects.equals(this.suspended, threadStack.suspended) &&
+ Objects.equals(this.inNative, threadStack.inNative) &&
+ Objects.equals(this.isDaemon, threadStack.isDaemon) &&
+ Objects.equals(this.priority, threadStack.priority);
}
@Override
public int hashCode() {
- return Objects.hash(threadId, threadName, threadState, stackTrace,
blockedByThreadId, blockedByLock, holdingLocks);
+ return Objects.hash(threadId, threadName, threadState, stackTrace,
blockedByThreadId, blockedByLock, holdingLocks, synchronizers, monitors,
lockName, lockOwnerName, suspended, inNative, isDaemon, priority);
}
@Override
@@ -294,6 +550,14 @@ public class ThreadStack {
sb.append(" blockedByThreadId:
").append(toIndentedString(blockedByThreadId)).append("\n");
sb.append(" blockedByLock:
").append(toIndentedString(blockedByLock)).append("\n");
sb.append(" holdingLocks:
").append(toIndentedString(holdingLocks)).append("\n");
+ sb.append(" synchronizers:
").append(toIndentedString(synchronizers)).append("\n");
+ sb.append(" monitors:
").append(toIndentedString(monitors)).append("\n");
+ sb.append(" lockName:
").append(toIndentedString(lockName)).append("\n");
+ sb.append(" lockOwnerName:
").append(toIndentedString(lockOwnerName)).append("\n");
+ sb.append(" suspended:
").append(toIndentedString(suspended)).append("\n");
+ sb.append(" inNative:
").append(toIndentedString(inNative)).append("\n");
+ sb.append(" isDaemon:
").append(toIndentedString(isDaemon)).append("\n");
+ sb.append(" priority:
").append(toIndentedString(priority)).append("\n");
sb.append("}");
return sb.toString();
}
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index a4b2802ce..108e185a3 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -561,6 +561,28 @@ components:
description: The locks that the current thread is holding.
items:
type: string
+ synchronizers:
+ type: array
+ description: The ownable synchronizers locked by the thread.
+ items:
+ type: string
+ monitors:
+ type: array
+ description: The object monitors locked by the thread.
+ items:
+ type: string
+ lockName:
+ type: string
+ description: The string representation of the object on which the
thread is blocked if any.
+ lockOwnerName:
+ type: string
+ description: The name of the thread that owns the object this thread
is blocked on.
+ suspended:
+ type: boolean
+ description: Whether the thread is suspended.
+ inNative:
+ type: boolean
+ description: Whether the thread is executing native code.
required:
- threadId
- threadName
diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
index f2c82ab0a..52f927db0 100644
--- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
@@ -298,6 +298,28 @@ components:
description: The locks that the current thread is holding.
items:
type: string
+ synchronizers:
+ type: array
+ description: The ownable synchronizers locked by the thread.
+ items:
+ type: string
+ monitors:
+ type: array
+ description: The object monitors locked by the thread.
+ items:
+ type: string
+ lockName:
+ type: string
+ description: The string representation of the object on which the
thread is blocked if any.
+ lockOwnerName:
+ type: string
+ description: The name of the thread that owns the object this thread
is blocked on.
+ suspended:
+ type: boolean
+ description: Whether the thread is suspended.
+ inNative:
+ type: boolean
+ description: Whether the thread is executing native code.
required:
- threadId
- threadName
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
index 3b91ebd0f..908a2667c 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
@@ -57,6 +57,12 @@ class ApiV1BaseResource extends ApiRequestContext {
threadStack.blockedByThreadId.getOrElse(null).asInstanceOf[java.lang.Long])
.blockedByLock(threadStack.blockedByLock)
.holdingLocks(threadStack.holdingLocks.asJava)
+ .synchronizers(threadStack.synchronizers.asJava)
+ .monitors(threadStack.monitors.asJava)
+ .lockName(threadStack.lockName.orNull)
+ .lockOwnerName(threadStack.lockOwnerName.orNull)
+ .suspended(threadStack.suspended)
+ .inNative(threadStack.inNative)
}.asJava)
}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
index 274c65c13..876f3dbec 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
@@ -46,5 +46,6 @@ abstract class ApiV1BaseResourceSuite extends HttpTestHelper {
val threadStacks =
response.readEntity(classOf[ThreadStackResponse]).getThreadStacks.asScala
assert(threadStacks.nonEmpty)
assert(threadStacks.exists(_.getBlockedByThreadId == null))
+ assert(threadStacks.exists(_.getLockName != null))
}
}