This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f74d7be0a735 [SPARK-55093][CORE] Handle TaskRunner construction
failures in launchTask
f74d7be0a735 is described below
commit f74d7be0a735a5f6fb57923048f8f0de1a74e202
Author: Zequn Lin <[email protected]>
AuthorDate: Mon Feb 2 11:25:19 2026 +0800
[SPARK-55093][CORE] Handle TaskRunner construction failures in launchTask
### What changes were proposed in this pull request?
- Move createTaskRunner into try-catch block to handle construction failures
- Add cleanup to remove TaskRunner from runningTasks if threadPool.execute
throws
Prevent potential memory leak by cleaning up TaskRunner when
threadPool.execute fails
### Why are the changes needed?
The createTaskRunner may throw an Exception.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added an unit test.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4.5
Closes #53865 from ChuckLin2025/55093.
Authored-by: Zequn Lin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/executor/Executor.scala | 38 ++++++++++----
.../org/apache/spark/executor/ExecutorSuite.scala | 58 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 72d2eb87af67..1c70f74da2bd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -548,17 +548,24 @@ private[spark] class Executor(
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription):
Unit = {
val taskId = taskDescription.taskId
- val tr = createTaskRunner(context, taskDescription)
- runningTasks.put(taskId, tr)
- val killMark = killMarks.get(taskId)
- if (killMark != null) {
- tr.kill(killMark._1, killMark._2)
- killMarks.remove(taskId)
- }
+ var taskRunnerOpt: Option[TaskRunner] = None
try {
+ val tr = createTaskRunner(context, taskDescription)
+ taskRunnerOpt = Some(tr)
+ runningTasks.put(taskId, tr)
+ val killMark = killMarks.get(taskId)
+ if (killMark != null) {
+ tr.kill(killMark._1, killMark._2)
+ killMarks.remove(taskId)
+ }
threadPool.execute(tr)
} catch {
case t: Throwable =>
+ // Clean up if task was added to runningTasks before the failure.
+ // If TaskRunner construction failed, taskRunnerOpt will be None and
nothing to clean up.
+ taskRunnerOpt.foreach { tr =>
+ runningTasks.remove(tr.taskId)
+ }
try {
logError(log"Executor launch task ${MDC(TASK_NAME,
taskDescription.name)} failed," +
log" reason: ${MDC(REASON, t.getMessage)}")
@@ -567,9 +574,22 @@ private[spark] class Executor(
TaskState.FAILED,
env.closureSerializer.newInstance().serialize(new
ExceptionFailure(t, Seq.empty)))
} catch {
+ case NonFatal(e) if env.isStopped =>
+ logError(
+ log"Executor update launching task " +
+ log"${MDC(TASK_NAME, taskDescription.name)} " +
+ log"failed status failed, reason: ${MDC(REASON,
t.getMessage)}" +
+ log", spark env is stopped"
+ )
+ // No need to exit the executor as the executor is already stopped.
+ // Leave it live to clean up the rest tasks and log info (similar to
SPARK-19147).
case t: Throwable =>
- logError(log"Executor update launching task ${MDC(TASK_NAME,
taskDescription.name)} " +
- log"failed status failed, reason: ${MDC(REASON, t.getMessage)}")
+ logError(
+ log"Executor update launching task " +
+ log"${MDC(TASK_NAME, taskDescription.name)} " +
+ log"failed status failed, reason: ${MDC(REASON,
t.getMessage)}" +
+ log", shutting down the executor"
+ )
System.exit(-1)
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 54627793e4e6..fec71a5285ed 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -655,6 +655,64 @@ class ExecutorSuite extends SparkFunSuite
}
}
+ test(
+ "SPARK-55093: launchTask should handle TaskRunner construction failures"
+ ) {
+ val conf = new SparkConf
+ val serializer = new JavaSerializer(conf)
+ val env = createMockEnv(conf, serializer)
+ val serializedTask = serializer.newInstance().serialize(new FakeTask(0, 0))
+ val taskDescription = createFakeTaskDescription(serializedTask)
+
+ val mockExecutorBackend = mock[ExecutorBackend]
+ val statusCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+
+ withExecutor("id", "localhost", env) { executor =>
+ // Use reflection to make createTaskRunner throw an exception by
replacing runningTasks
+ // with a mock that throws when put is called. This simulates a failure
after TaskRunner
+ // construction but tests the same cleanup logic.
+ val executorClass = classOf[Executor]
+ val runningTasksField = executorClass.getDeclaredField("runningTasks")
+ runningTasksField.setAccessible(true)
+ val originalRunningTasks = runningTasksField.get(executor)
+
+ // Create a mock ConcurrentHashMap that throws when put is called
+ val testException = new RuntimeException("TaskRunner construction
failed")
+ type TaskRunnerType = executor.TaskRunner
+ val mockRunningTasks =
+ mock[java.util.concurrent.ConcurrentHashMap[Long, TaskRunnerType]]
+ when(mockRunningTasks.put(any[Long], any[TaskRunnerType]))
+ .thenThrow(testException)
+ runningTasksField.set(executor, mockRunningTasks)
+
+ try {
+ // Launch the task - this should catch the exception and send
statusUpdate
+ executor.launchTask(mockExecutorBackend, taskDescription)
+
+ // Verify that statusUpdate was called with FAILED state
+ verify(mockExecutorBackend).statusUpdate(
+ meq(taskDescription.taskId),
+ meq(TaskState.FAILED),
+ statusCaptor.capture()
+ )
+
+ // Verify that the exception was correctly serialized
+ val failureData = statusCaptor.getValue
+ val failReason = serializer
+ .newInstance()
+ .deserialize[ExceptionFailure](failureData)
+ assert(failReason.exception.isDefined)
+ assert(failReason.exception.get.isInstanceOf[RuntimeException])
+ assert(
+ failReason.exception.get.getMessage === "TaskRunner construction
failed"
+ )
+ } finally {
+ // Restore the original runningTasks
+ runningTasksField.set(executor, originalRunningTasks)
+ }
+ }
+ }
+
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer):
SparkEnv = {
val mockEnv = mock[SparkEnv]
val mockRpcEnv = mock[RpcEnv]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]