[
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Steven Tran updated SPARK-54217:
--------------------------------
Description:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
in daemon mode will allow for worker reuse, where possible, as long as the
worker successfully completed its last-assigned task (via
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
The worker will be released into the idle queue to be picked up by the next
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
call.
However, there is a race condition that can result in a released worker in the
[PythonWorkerFactory idle
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
getting killed. i.e. the `PythonRunner` lacks synchronization between:
* the main task thread's decision to release its associated Python worker
(when work is complete), and
* the `MonitorThread`'s decision to kill the associated Python worker (when
requested by the executor, e.g. speculative execution where another attempt
succeeds).
So, the following sequence of events is possible:
# `PythonRunner` is running
# The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
# [`PythonRunner`'s main task thread receives this instruction and releases
the worker for
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
# For a separate reason: Executor decides to kill this task (e.g. speculative
execution)
# [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
already-relinquished
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
So the next task that pulls this Python worker from the idle pool will have a
dead Python worker.
Important notes:
* Prior to SPARK-47565, this would result in a crash (failing the task) as we
would reuse this now-closed worker.
* With SPARK-47565, this is less of an issue, as we check that the worker is
alive before we use it.
----
h2. Repro
[This PR|https://github.com/st-tran/spark/pull/1] shows that this is an issue.
It will also be patched on top of the fix to show that the issue was fixed.
Without the fix, we see from the logs that we hit this line indicating that
we're pulling dead workers due to this issue:
{code:java}
sttran@Stevens-MacBook-Pro ~/sttran-spark/work
% grep -nri discard
./app-20251105212207-0012/9/stderr:140:25/11/05 21:22:57 WARN
PythonWorkerFactory: Worker
PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
is dead, discarding.
./app-20251105212207-0012/0/stderr:80:25/11/05 21:22:26 WARN
PythonWorkerFactory: Worker
PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
is dead, discarding.
./app-20251105212207-0012/0/stderr:88:25/11/05 21:22:32 WARN
PythonWorkerFactory: Worker
PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
is dead, discarding.
...{code}
was:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
in daemon mode will allow for worker reuse, where possible, as long as the
worker successfully completed its last-assigned task (via
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
The worker will be released into the idle queue to be picked up by the next
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
call.
However, there is a race condition that can result in a released worker in the
[PythonWorkerFactory idle
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
getting killed. i.e. the `PythonRunner` lacks synchronization between:
* the main task thread's decision to release its associated Python worker
(when work is complete), and
* the `MonitorThread`'s decision to kill the associated Python worker (when
requested by the executor, e.g. speculative execution where another attempt
succeeds).
So, the following sequence of events is possible:
# `PythonRunner` is running
# The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
# [`PythonRunner`'s main task thread receives this instruction and releases
the worker for
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
# For a separate reason: Executor decides to kill this task (e.g. speculative
execution)
# [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
already-relinquished
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
So the next task that pulls this Python worker from the idle pool will have a
dead Python worker.
Important notes:
* Prior to SPARK-47565, this would result in a crash (failing the task) as we
would reuse this now-closed worker.
* With SPARK-47565, this is less of an issue, as we check that the worker is
alive before we use it.
> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
> Key: SPARK-54217
> URL: https://issues.apache.org/jira/browse/SPARK-54217
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Steven Tran
> Priority: Major
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
> in daemon mode will allow for worker reuse, where possible, as long as the
> worker successfully completed its last-assigned task (via
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
> The worker will be released into the idle queue to be picked up by the next
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
> call.
>
> However, there is a race condition that can result in a released worker in
> the [PythonWorkerFactory idle
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
> getting killed. i.e. the `PythonRunner` lacks synchronization between:
> * the main task thread's decision to release its associated Python worker
> (when work is complete), and
> * the `MonitorThread`'s decision to kill the associated Python worker (when
> requested by the executor, e.g. speculative execution where another attempt
> succeeds).
> So, the following sequence of events is possible:
> # `PythonRunner` is running
> # The Python worker finishes its work and writes `END_OF_STREAM` to signal
> back to `PythonRunner`'s main task thread that it is done
> # [`PythonRunner`'s main task thread receives this instruction and releases
> the worker for
> reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
> # For a separate reason: Executor decides to kill this task (e.g.
> speculative execution)
> # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
> already-relinquished
> `PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
>
> So the next task that pulls this Python worker from the idle pool will have a
> dead Python worker.
>
> Important notes:
> * Prior to SPARK-47565, this would result in a crash (failing the task) as
> we would reuse this now-closed worker.
> * With SPARK-47565, this is less of an issue, as we check that the worker is
> alive before we use it.
>
> ----
>
> h2. Repro
>
> [This PR|https://github.com/st-tran/spark/pull/1] shows that this is an
> issue. It will also be patched on top of the fix to show that the issue was
> fixed.
>
> Without the fix, we see from the logs that we hit this line indicating that
> we're pulling dead workers due to this issue:
> {code:java}
> sttran@Stevens-MacBook-Pro ~/sttran-spark/work
> % grep -nri discard
> ./app-20251105212207-0012/9/stderr:140:25/11/05 21:22:57 WARN
> PythonWorkerFactory: Worker
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
> is dead, discarding.
> ./app-20251105212207-0012/0/stderr:80:25/11/05 21:22:26 WARN
> PythonWorkerFactory: Worker
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
> is dead, discarding.
> ./app-20251105212207-0012/0/stderr:88:25/11/05 21:22:32 WARN
> PythonWorkerFactory: Worker
> PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue
> is dead, discarding.
> ...{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]