This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f74d7be0a735 [SPARK-55093][CORE] Handle TaskRunner construction 
failures in launchTask
f74d7be0a735 is described below

commit f74d7be0a735a5f6fb57923048f8f0de1a74e202
Author: Zequn Lin <[email protected]>
AuthorDate: Mon Feb 2 11:25:19 2026 +0800

    [SPARK-55093][CORE] Handle TaskRunner construction failures in launchTask
    
    ### What changes were proposed in this pull request?
    
    - Move createTaskRunner into try-catch block to handle construction failures
    - Add cleanup to remove TaskRunner from runningTasks if threadPool.execute 
throws
       Prevent potential memory leak by cleaning up TaskRunner when 
threadPool.execute fails
    
    ### Why are the changes needed?
    
    The createTaskRunner may throw an Exception.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added an unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: claude-4.5
    
    Closes #53865 from ChuckLin2025/55093.
    
    Authored-by: Zequn Lin <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/executor/Executor.scala | 38 ++++++++++----
 .../org/apache/spark/executor/ExecutorSuite.scala  | 58 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 72d2eb87af67..1c70f74da2bd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -548,17 +548,24 @@ private[spark] class Executor(
 
   def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): 
Unit = {
     val taskId = taskDescription.taskId
-    val tr = createTaskRunner(context, taskDescription)
-    runningTasks.put(taskId, tr)
-    val killMark = killMarks.get(taskId)
-    if (killMark != null) {
-      tr.kill(killMark._1, killMark._2)
-      killMarks.remove(taskId)
-    }
+    var taskRunnerOpt: Option[TaskRunner] = None
     try {
+      val tr = createTaskRunner(context, taskDescription)
+      taskRunnerOpt = Some(tr)
+      runningTasks.put(taskId, tr)
+      val killMark = killMarks.get(taskId)
+      if (killMark != null) {
+        tr.kill(killMark._1, killMark._2)
+        killMarks.remove(taskId)
+      }
       threadPool.execute(tr)
     } catch {
       case t: Throwable =>
+        // Clean up if task was added to runningTasks before the failure.
+        // If TaskRunner construction failed, taskRunnerOpt will be None and 
nothing to clean up.
+        taskRunnerOpt.foreach { tr =>
+          runningTasks.remove(tr.taskId)
+        }
         try {
           logError(log"Executor launch task ${MDC(TASK_NAME, 
taskDescription.name)} failed," +
             log" reason: ${MDC(REASON, t.getMessage)}")
@@ -567,9 +574,22 @@ private[spark] class Executor(
             TaskState.FAILED,
             env.closureSerializer.newInstance().serialize(new 
ExceptionFailure(t, Seq.empty)))
         } catch {
+          case NonFatal(e) if env.isStopped =>
+            logError(
+              log"Executor update launching task " +
+                log"${MDC(TASK_NAME, taskDescription.name)} " +
+                log"failed status failed, reason: ${MDC(REASON, 
t.getMessage)}" +
+                log", spark env is stopped"
+            )
+          // No need to exit the executor as the executor is already stopped.
+          // Leave it live to clean up the rest tasks and log info (similar to 
SPARK-19147).
           case t: Throwable =>
-            logError(log"Executor update launching task ${MDC(TASK_NAME, 
taskDescription.name)} " +
-              log"failed status failed, reason: ${MDC(REASON, t.getMessage)}")
+            logError(
+              log"Executor update launching task " +
+                log"${MDC(TASK_NAME, taskDescription.name)} " +
+                log"failed status failed, reason: ${MDC(REASON, 
t.getMessage)}" +
+                log", shutting down the executor"
+            )
             System.exit(-1)
         }
     }
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 54627793e4e6..fec71a5285ed 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -655,6 +655,64 @@ class ExecutorSuite extends SparkFunSuite
     }
   }
 
+  test(
+    "SPARK-55093: launchTask should handle TaskRunner construction failures"
+  ) {
+    val conf = new SparkConf
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    val serializedTask = serializer.newInstance().serialize(new FakeTask(0, 0))
+    val taskDescription = createFakeTaskDescription(serializedTask)
+
+    val mockExecutorBackend = mock[ExecutorBackend]
+    val statusCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+
+    withExecutor("id", "localhost", env) { executor =>
+      // Use reflection to make createTaskRunner throw an exception by 
replacing runningTasks
+      // with a mock that throws when put is called. This simulates a failure 
after TaskRunner
+      // construction but tests the same cleanup logic.
+      val executorClass = classOf[Executor]
+      val runningTasksField = executorClass.getDeclaredField("runningTasks")
+      runningTasksField.setAccessible(true)
+      val originalRunningTasks = runningTasksField.get(executor)
+
+      // Create a mock ConcurrentHashMap that throws when put is called
+      val testException = new RuntimeException("TaskRunner construction 
failed")
+      type TaskRunnerType = executor.TaskRunner
+      val mockRunningTasks =
+        mock[java.util.concurrent.ConcurrentHashMap[Long, TaskRunnerType]]
+      when(mockRunningTasks.put(any[Long], any[TaskRunnerType]))
+        .thenThrow(testException)
+      runningTasksField.set(executor, mockRunningTasks)
+
+      try {
+        // Launch the task - this should catch the exception and send 
statusUpdate
+        executor.launchTask(mockExecutorBackend, taskDescription)
+
+        // Verify that statusUpdate was called with FAILED state
+        verify(mockExecutorBackend).statusUpdate(
+          meq(taskDescription.taskId),
+          meq(TaskState.FAILED),
+          statusCaptor.capture()
+        )
+
+        // Verify that the exception was correctly serialized
+        val failureData = statusCaptor.getValue
+        val failReason = serializer
+          .newInstance()
+          .deserialize[ExceptionFailure](failureData)
+        assert(failReason.exception.isDefined)
+        assert(failReason.exception.get.isInstanceOf[RuntimeException])
+        assert(
+          failReason.exception.get.getMessage === "TaskRunner construction 
failed"
+        )
+      } finally {
+        // Restore the original runningTasks
+        runningTasksField.set(executor, originalRunningTasks)
+      }
+    }
+  }
+
   private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): 
SparkEnv = {
     val mockEnv = mock[SparkEnv]
     val mockRpcEnv = mock[RpcEnv]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to