[ 
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-54217:
-----------------------------------
    Labels: pull-request-available  (was: )

> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
>                 Key: SPARK-54217
>                 URL: https://issues.apache.org/jira/browse/SPARK-54217
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Steven Tran
>            Priority: Major
>              Labels: pull-request-available
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
>  in daemon mode will allow for worker reuse, where possible, as long as the 
> worker successfully completed its last-assigned task (via 
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
>  The worker will be released into the idle queue to be picked up by the next 
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
>  call.
>  
> However, there is a race condition that can result in a released worker in 
> the [PythonWorkerFactory idle 
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
>  getting killed. i.e. the `PythonRunner` lacks synchronization between:
>  * the main task thread's decision to release its associated Python worker 
> (when work is complete), and
>  * the `MonitorThread`'s decision to kill the associated Python worker (when 
> requested by the executor, e.g. speculative execution where another attempt 
> succeeds).
> So, the following sequence of events is possible:
>  # `PythonRunner` is running
>  # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
> back to `PythonRunner`'s main task thread that it is done
>  # [`PythonRunner`'s main task thread receives this instruction and releases 
> the worker for 
> reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
>  # For a separate reason: Executor decides to kill this task (e.g. 
> speculative execution)
>  # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
> already-relinquished 
> `PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
>  
> So the next task that pulls this Python worker from the idle pool will have a 
> dead Python worker.
>  
> Important notes:
>  * Prior to SPARK-47565, this would result in a crash (failing the task) as 
> we would reuse this now-closed worker.
>  * With SPARK-47565, this is less of an issue, as we check that the worker is 
> alive before we use it.
>  
> ----
>  
> h2. Repro
>  
> [This PR|https://github.com/st-tran/spark/pull/1] is used to trigger 
> aggressive speculative execution (and in turn task killing) to show that this 
> is an issue. It will also be patched on top of the fix to show that the issue 
> was fixed.
>  
> Without the fix, we see from the logs that we hit this line indicating that 
> we're pulling dead workers due to this issue:
> {code:java}
> sttran@Stevens-MacBook-Pro ~/sttran-spark/work
>  % grep -nri discard
> ./app-20251105212207-0012/9/stderr:140:25/11/05 21:22:57 WARN 
> PythonWorkerFactory: Worker 
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue 
> is dead, discarding.
> ./app-20251105212207-0012/0/stderr:80:25/11/05 21:22:26 WARN 
> PythonWorkerFactory: Worker 
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue 
> is dead, discarding.
> ./app-20251105212207-0012/0/stderr:88:25/11/05 21:22:32 WARN 
> PythonWorkerFactory: Worker 
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue 
> is dead, discarding.
> ...{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to