[
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-54217:
-----------------------------------
Labels: pull-request-available (was: )
> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
> Key: SPARK-54217
> URL: https://issues.apache.org/jira/browse/SPARK-54217
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Steven Tran
> Priority: Major
> Labels: pull-request-available
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
> in daemon mode will allow for worker reuse, where possible, as long as the
> worker successfully completed its last-assigned task (via
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
> The worker will be released into the idle queue to be picked up by the next
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
> call.
>
> However, there is a race condition that can result in a released worker in
> the [PythonWorkerFactory idle
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
> getting killed. i.e. the `PythonRunner` lacks synchronization between:
> * the main task thread's decision to release its associated Python worker
> (when work is complete), and
> * the `MonitorThread`'s decision to kill the associated Python worker (when
> requested by the executor, e.g. speculative execution where another attempt
> succeeds).
> So, the following sequence of events is possible:
> # `PythonRunner` is running
> # The Python worker finishes its work and writes `END_OF_STREAM` to signal
> back to `PythonRunner`'s main task thread that it is done
> # [`PythonRunner`'s main task thread receives this instruction and releases
> the worker for
> reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
> # For a separate reason: Executor decides to kill this task (e.g.
> speculative execution)
> # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
> already-relinquished
> `PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
>
> So the next task that pulls this Python worker from the idle pool will have a
> dead Python worker.
>
> Important notes:
> * Prior to SPARK-47565, this would result in a crash (failing the task) as
> we would reuse this now-closed worker.
> * With SPARK-47565, this is less of an issue, as we check that the worker is
> alive before we use it.
>
> ----
>
> h2. Repro
>
> [This PR|https://github.com/st-tran/spark/pull/1] is used to trigger
> aggressive speculative execution (and in turn task killing) to show that this
> is an issue. It will also be patched on top of the fix to show that the issue
> was fixed.
>
> Without the fix, we see from the logs that we hit this line indicating that
> we're pulling dead workers due to this issue:
> {code:java}
> sttran@Stevens-MacBook-Pro ~/sttran-spark/work
> % grep -nri discard
> ./app-20251105212207-0012/9/stderr:140:25/11/05 21:22:57 WARN
> PythonWorkerFactory: Worker
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
> is dead, discarding.
> ./app-20251105212207-0012/0/stderr:80:25/11/05 21:22:26 WARN
> PythonWorkerFactory: Worker
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
> is dead, discarding.
> ./app-20251105212207-0012/0/stderr:88:25/11/05 21:22:32 WARN
> PythonWorkerFactory: Worker
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
> is dead, discarding.
> ...{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]