This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e3bc281c58d3 [SPARK-54217][PYTHON] Synchronize PythonRunner's 
MonitorThread kill decision
e3bc281c58d3 is described below

commit e3bc281c58d3cd0f462c5b838632f2efd32f9e86
Author: st-tran <[email protected]>
AuthorDate: Thu Jan 22 07:16:49 2026 +0900

    [SPARK-54217][PYTHON] Synchronize PythonRunner's MonitorThread kill decision
    
    ### What changes were proposed in this pull request?
    This diff addresses the synchronization issue described in SPARK-54217 by 
respecting the existing releasedOrClosed AtomicBoolean in the PythonRunner's 
kill codepath, which is currently only used in the "released" codepath - not 
the "closed" one. In doing so, we avoid erroneously destroying a still-healthy 
Python worker; in the current state, it will be destroyed & a new one will be 
created.
    
    [Jira ticket description 
follows...](https://issues.apache.org/jira/browse/SPARK-54217)
    
    
[PythonWorkerFactory](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala)
 in daemon mode will allow for worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650)).
 The worker wi [...]
    
    However, there is a race condition that can result in a released worker in 
the [PythonWorkerFactory idle 
queue](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116)
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
    1. the main task thread's decision to release its associated Python worker 
(when work is complete), and
    2. the `MonitorThread`'s decision to kill the associated Python worker 
(when requested by the executor, e.g. speculative execution where another 
attempt succeeds).
    
    So, the following sequence of events is possible:
    1. `PythonRunner` is running
    2. The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
    3. [`PythonRunner`'s main task thread receives this instruction and 
releases the worker for 
reuse](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647)
    4. For a separate reason: Executor decides to kill this task (e.g. 
speculative execution)
    5. [`PythonRunner`'s `MonitorThread` receives this instruction and kills 
the already-relinquished 
`PythonWorker`](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696)
    
    So the next task that pulls this Python worker from the idle pool will have 
a dead Python worker.
    
    ### Why are the changes needed?
    In the latest Spark release, this change is NOT critical, however, it 
avoids the unnecessary killing of a still-healthy Python worker which results 
in another one being created.
    
    * Prior to 
[SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), this would 
result in a crash (failing the task) as we would reuse this now-closed worker.
    * With [SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), 
this is less of an issue, as we check that the worker is alive before we use it.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    It is not possible (or very hard) to unit test this change.
    
    However, I've created a minimal repro of the issue. The error occurs when 
running without this change, and goes away with this change.
    * Without this change: details in SPARK-54217; we see the following log 
line indicating that we are attempting to reuse killed Python workers:
      * `PythonWorker(java.nio.channels.SocketChannel[closed]) process from 
idle queue is dead, discarding.`
    * With this change:
      * First, cherry-pick [this PR](https://github.com/st-tran/spark/pull/1) 
onto this fix.
      * Then, rerun the job with this new build and we see that the errors go 
away:
    
    ```
    sttranStevens-MacBook-Pro ~/sttran-spark/work
     % ls -lathr
    total 0
    drwxr-xr-x 18 sttran  staff   576B Nov  5 21:22 app-20251105212207-0012  # 
Without the fix (has errors shown in https://github.com/st-tran/spark/pull/1)
    drwxr-xr-x 53 sttran  staff   1.7K Nov  5 22:22 ..
    drwxr-xr-x  4 sttran  staff   128B Nov  5 22:29 .
    drwxr-xr-x 18 sttran  staff   576B Nov  5 22:29 app-20251105222956-0013  # 
With the fix (no errors per search below)
    sttranStevens-MacBook-Pro ~/sttran-spark/work
     % grep -nri discard *0013
    sttranStevens-MacBook-Pro ~/sttran-spark/work
     %
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52915 from st-tran/sttran/synchronize-pythonrunner-release-destroy.
    
    Authored-by: st-tran <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/api/python/PythonRunner.scala     | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 5f37e1f67b12..bc0e9f2beb2f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -350,10 +350,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
       // and task context
       if (PythonRunner.runningMonitorThreads.add(key)) {
-        new MonitorThread(SparkEnv.get, worker, context).start()
+        new MonitorThread(SparkEnv.get, worker, context, 
releasedOrClosed).start()
       }
     } else {
-      new MonitorThread(SparkEnv.get, worker, context).start()
+      new MonitorThread(SparkEnv.get, worker, context, 
releasedOrClosed).start()
     }
 
     // Return an iterator that read lines from the process's stdout
@@ -725,7 +725,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
    * interrupts disabled. In that case we will need to explicitly kill the 
worker, otherwise the
    * threads can block indefinitely.
    */
-  class MonitorThread(env: SparkEnv, worker: PythonWorker, context: 
TaskContext)
+  class MonitorThread(
+      env: SparkEnv,
+      worker: PythonWorker,
+      context: TaskContext,
+      releasedOrClosed: AtomicBoolean)
     extends Thread(s"Worker Monitor for $pythonExec") {
 
     /** How long to wait before killing the python worker if a task cannot be 
interrupted. */
@@ -741,7 +745,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       }
       if (!context.isCompleted()) {
         Thread.sleep(taskKillTimeout)
-        if (!context.isCompleted()) {
+        if (!context.isCompleted() && releasedOrClosed.compareAndSet(false, 
true)) {
           try {
             logWarning(log"Incomplete task interrupted: Attempting to kill 
Python Worker - " +
               log"${MDC(TASK_NAME, taskIdentifier(context))}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to