This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c34ec411244 [SPARK-44818] Fix race for pending task kill issued before 
taskThread is initialized
c34ec411244 is described below

commit c34ec41124446164e9cfdd34101f25c6aa0ae235
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Mon Aug 21 13:25:28 2023 -0700

    [SPARK-44818] Fix race for pending task kill issued before taskThread is 
initialized
    
    ### What changes were proposed in this pull request?
    Fix race for pending task kill issued before taskThread is initialized
    
    ### Why are the changes needed?
    We see that there is a race for tasks that are interrupted through stage 
cancellation and that may be added to the TaskSet, but don't yet have 
taskThread initialized.
    
    Basically, we try to kill ongoing task attempts to handle stage cancellation
    
    ```
        logInfo("Cancelling stage " + stageId)
        // Kill all running tasks for the stage.
        killAllTaskAttempts(stageId, interruptThread, reason = "Stage 
cancelled: " + reason)
        // Cancel all attempts for the stage.
    ```
    
    However, there is a chance that taskThread is not initialized yet and we 
only set the reasonIfKilled.
    
    ```
      def kill(interruptThread: Boolean, reason: String): Unit = {
        require(reason != null)
        _reasonIfKilled = reason
        if (context != null) {
          context.markInterrupted(reason)
        }
        if (interruptThread && taskThread != null) {
          taskThread.interrupt().  <--- never hit
        }
    ```
    
    Then within the task execution thread itself, we try to call kill again 
since the reasonIfKilled is set. However, this time we pass interruptThread as 
false explicitly since we don't know the status of the previous call.
    
    ```
        taskThread = Thread.currentThread()
    
        if (_reasonIfKilled != null) {
          kill(interruptThread = false, _reasonIfKilled) <-- only context will 
be set,
        }
    ```
    
    The TaskReaper has also finished its previous and only attempt at task 
interruption since we don't try for multiple times in this case. Eventually, 
the task is not interrupted even once and it gets blocked on some I/O or wait 
calls which might not finish within the reaper timeout, leading to the JVM 
being killed.
    
    ```
            taskRunner.kill(interruptThread = interruptThread, reason = reason)
    ```
    
    The change tries to fix this issue by checking for the presence of 
`reasonIfKilled` on the context and issuing a `TaskKilledException` before we 
execute `runTask` thereby preventing execution of the actual task and freeing 
up the slot and also preventing future issues with the reaper.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ```
    [info] JobCancellationSuite:
    ...
    [info] Run completed in 35 seconds, 781 milliseconds.
    [info] Total number of tests run: 13
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #42504 from anishshri-db/task/SPARK-44818.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 core/src/main/scala/org/apache/spark/TaskContext.scala | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 450c00928c9..0f8a10d734b 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -158,6 +158,11 @@ abstract class TaskContext extends Serializable {
   /** Runs a task with this context, ensuring failure and completion listeners 
get triggered. */
   private[spark] def runTaskWithListeners[T](task: Task[T]): T = {
     try {
+      // SPARK-44818 - Its possible that taskThread has not been initialized 
when kill is initially
+      // called with interruptThread=true. We do set the reason and eventually 
will set it on the
+      // context too within run(). If that's the case, kill the thread before 
it starts executing
+      // the actual task.
+      killTaskIfInterrupted()
       task.runTask(this)
     } catch {
       case e: Throwable =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to