This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8edd591e4ef1 [SPARK-54115][CORE][UI] Escalate display ordering 
priority of connect server operation threads in thread dump page
8edd591e4ef1 is described below

commit 8edd591e4ef162b06df954772f5d1f22f096875b
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 3 20:28:37 2025 -0800

    [SPARK-54115][CORE][UI] Escalate display ordering priority of connect 
server operation threads in thread dump page
    
    ### What changes were proposed in this pull request?
    
    Escalate display ordering priority of connect server execution threads in 
the thread dump page by defining a custom `threadInfoOrdering`.
    
    For connect server runs in local deploy mode, tasks also run in driver, in 
driver thread dump page, task threads display first, then connect operation 
threads.
    
    For connect server runs in other deploy modes (YARN, K8s, Standalone), in 
driver thread dump page, connect operation threads display first.
    
    ### Why are the changes needed?
    
    Currently, Spark executor displays the task threads first on the thread 
dump page, this does improve the user experience in troubleshooting "task 
stuck" issues.
    
    There are a lot of similar stuck issues on the driver's side too, e.g. 
driver may be stuck at the HMS/HDFS RPC call on the query planning phase, 
displaying the connect operation threads at the top on the driver thread dump 
pages makes the users easy to diagnose driver stuck issues for the Connect 
server.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it affects the live UI thread dump page.
    
    ### How was this patch tested?
    
    Add UT. Also manually tested.
    
    Start a connect server in local mode, and use a client to run some queries.
    
    Note, since it runs in local mode, the task also executes at the driver 
side, the page display task threads first, then connect operation threads.
    
    <img width="1561" height="1038" alt="Xnip2025-10-31_18-14-16" 
src="https://github.com/user-attachments/assets/0c48412a-c6cd-4422-8126-226888ce2c56";
 />
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52816 from pan3793/SPARK-54115.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit bdac5cd192af7ce8477760f8300da95d04ef49cd)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/executor/Executor.scala |  7 +++-
 .../main/scala/org/apache/spark/util/Utils.scala   | 47 ++++++++++++++--------
 .../scala/org/apache/spark/util/UtilsSuite.scala   | 42 ++++++++++++++++++-
 .../connect/execution/ExecuteThreadRunner.scala    |  3 +-
 4 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a14ba21a0c18..eb45e810d425 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -40,6 +40,7 @@ import org.slf4j.{MDC => SLF4JMDC}
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.Executor.TASK_THREAD_NAME_PREFIX
 import org.apache.spark.internal.{Logging, LogKeys}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.internal.config._
@@ -132,7 +133,7 @@ private[spark] class Executor(
   private[executor] val threadPool = {
     val threadFactory = new ThreadFactoryBuilder()
       .setDaemon(true)
-      .setNameFormat("Executor task launch worker-%d")
+      .setNameFormat(s"$TASK_THREAD_NAME_PREFIX-%d")
       .setThreadFactory((r: Runnable) => new UninterruptibleThread(r, 
"unused"))
       .build()
     
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
@@ -478,7 +479,7 @@ private[spark] class Executor(
 
     val taskId = taskDescription.taskId
     val taskName = taskDescription.name
-    val threadName = s"Executor task launch worker for $taskName"
+    val threadName = s"$TASK_THREAD_NAME_PREFIX for $taskName"
     val mdcProperties = taskDescription.properties.asScala
       .filter(_._1.startsWith("mdc.")).toSeq
 
@@ -1316,6 +1317,8 @@ private[spark] class Executor(
 }
 
 private[spark] object Executor extends Logging {
+  val TASK_THREAD_NAME_PREFIX = "Executor task launch worker"
+
   // This is reserved for internal use by components that need to read task 
properties before a
   // task is fully deserialized. When possible, the 
TaskContext.getLocalProperty call should be
   // used instead.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8b1ea4d25592..81e86c82211c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -65,6 +65,7 @@ import org.slf4j.Logger
 
 import org.apache.spark.{SPARK_VERSION, _}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.Executor.TASK_THREAD_NAME_PREFIX
 import org.apache.spark.internal.{Logging, MessageWithContext}
 import org.apache.spark.internal.LogKeys
 import org.apache.spark.internal.LogKeys._
@@ -2086,27 +2087,39 @@ private[spark] object Utils
     }
   }
 
+  val CONNECT_EXECUTE_THREAD_PREFIX = "SparkConnectExecuteThread"
+
+  private val threadInfoOrdering = Ordering.fromLessThan {
+    (threadTrace1: ThreadInfo, threadTrace2: ThreadInfo) => {
+      def priority(ti: ThreadInfo): Int = ti.getThreadName match {
+        case name if name.startsWith(TASK_THREAD_NAME_PREFIX) => 100
+        case name if name.startsWith(CONNECT_EXECUTE_THREAD_PREFIX) => 80
+        case _ => 0
+      }
+
+      val v1 = priority(threadTrace1)
+      val v2 = priority(threadTrace2)
+      if (v1 == v2) {
+        val name1 = threadTrace1.getThreadName.toLowerCase(Locale.ROOT)
+        val name2 = threadTrace2.getThreadName.toLowerCase(Locale.ROOT)
+        val nameCmpRes = name1.compareTo(name2)
+        if (nameCmpRes == 0) {
+          threadTrace1.getThreadId < threadTrace2.getThreadId
+        } else {
+          nameCmpRes < 0
+        }
+      } else {
+        v1 > v2
+      }
+    }
+  }
+
   /** Return a thread dump of all threads' stacktraces.  Used to capture dumps 
for the web UI */
   def getThreadDump(): Array[ThreadStackTrace] = {
     // We need to filter out null values here because dumpAllThreads() may 
return null array
     // elements for threads that are dead / don't exist.
-    val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, 
true).filter(_ != null)
-    threadInfos.sortWith { case (threadTrace1, threadTrace2) =>
-        val v1 = if (threadTrace1.getThreadName.contains("Executor task 
launch")) 1 else 0
-        val v2 = if (threadTrace2.getThreadName.contains("Executor task 
launch")) 1 else 0
-        if (v1 == v2) {
-          val name1 = threadTrace1.getThreadName().toLowerCase(Locale.ROOT)
-          val name2 = threadTrace2.getThreadName().toLowerCase(Locale.ROOT)
-          val nameCmpRes = name1.compareTo(name2)
-          if (nameCmpRes == 0) {
-            threadTrace1.getThreadId < threadTrace2.getThreadId
-          } else {
-            nameCmpRes < 0
-          }
-        } else {
-          v1 > v2
-        }
-    }.map(threadInfoToThreadStackTrace)
+    ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != 
null)
+      .sorted(threadInfoOrdering).map(threadInfoToThreadStackTrace)
   }
 
   /** Return a heap dump. Used to capture dumps for the web UI */
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 933b6fc39e91..d600260e9df2 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import java.io._
+import java.lang.management.ThreadInfo
 import java.lang.reflect.Field
 import java.net.{BindException, ServerSocket, URI}
 import java.nio.{ByteBuffer, ByteOrder}
@@ -37,6 +38,9 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
 import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
 import org.apache.logging.log4j.Level
+import org.mockito.Mockito.doReturn
+import org.scalatest.PrivateMethodTester
+import org.scalatestplus.mockito.MockitoSugar.mock
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
 import org.apache.spark.internal.config._
@@ -47,7 +51,7 @@ import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.util.collection.Utils.createArray
 import org.apache.spark.util.io.ChunkedByteBufferInputStream
 
-class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
+class UtilsSuite extends SparkFunSuite with ResetSystemProperties with 
PrivateMethodTester {
 
   test("timeConversion") {
     // Test -1
@@ -1126,6 +1130,42 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties {
     assert(pValue > threshold)
   }
 
+  test("ThreadInfoOrdering") {
+    val task1T = mock[ThreadInfo]
+    doReturn(11L).when(task1T).getThreadId
+    doReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
+      .when(task1T).getThreadName
+    doReturn("Executor task launch worker for task 1.0 in stage 1.0 (TID 11)")
+      .when(task1T).toString
+
+    val task2T = mock[ThreadInfo]
+    doReturn(12L).when(task2T).getThreadId
+    doReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
+      .when(task2T).getThreadName
+    doReturn("Executor task launch worker for task 2.0 in stage 1.0 (TID 22)")
+      .when(task2T).toString
+
+    val connectExecuteOp1T = mock[ThreadInfo]
+    doReturn(21L).when(connectExecuteOp1T).getThreadId
+    
doReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
+      .when(connectExecuteOp1T).getThreadName
+    
doReturn("SparkConnectExecuteThread_opId=16148fb4-4189-43c3-b8d4-8b3b6ddd41c7")
+      .when(connectExecuteOp1T).toString
+
+    val connectExecuteOp2T = mock[ThreadInfo]
+    doReturn(22L).when(connectExecuteOp2T).getThreadId
+    
doReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
+      .when(connectExecuteOp2T).getThreadName
+    
doReturn("SparkConnectExecuteThread_opId=4e4d1cac-ffde-46c1-b7c2-808b726cb47e")
+      .when(connectExecuteOp2T).toString
+
+    val threadInfoOrderingMethod =
+      PrivateMethod[Ordering[ThreadInfo]](Symbol("threadInfoOrdering"))
+    val sorted = Seq(connectExecuteOp1T, connectExecuteOp2T, task1T, task2T)
+      .sorted(Utils.invokePrivate(threadInfoOrderingMethod()))
+    assert(sorted === Seq(task1T, task2T, connectExecuteOp1T, 
connectExecuteOp2T))
+  }
+
   test("redact sensitive information") {
     val sparkConf = new SparkConf
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index 38ed2528cbde..f206ee1555a7 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.connect.service.{ExecuteHolder, 
ExecuteSessionTag, S
 import org.apache.spark.sql.connect.utils.ErrorUtils
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.Utils
+import org.apache.spark.util.Utils.CONNECT_EXECUTE_THREAD_PREFIX
 
 /**
  * This class launches the actual execution in an execution thread. The 
execution pushes the
@@ -329,7 +330,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
   }
 
   private class ExecutionThread()
-      extends 
Thread(s"SparkConnectExecuteThread_opId=${executeHolder.operationId}") {
+      extends 
Thread(s"${CONNECT_EXECUTE_THREAD_PREFIX}_opId=${executeHolder.operationId}") {
     override def run(): Unit = execute()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to