[
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Steven Tran updated SPARK-54217:
--------------------------------
Description:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
in daemon mode will allow for worker reuse, where possible, as long as the
worker successfully completed its last-assigned task (via
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
The worker will be released into the idle queue to be picked up by the next
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
call.
However, there is a race condition that can result in a released worker in the
[PythonWorkerFactory idle
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
getting killed. i.e. the `PythonRunner` lacks synchronization between:
* the main task thread's decision to release its associated Python worker
(when work is complete), and
* the `MonitorThread`'s decision to kill the associated Python worker (when
requested by the executor, e.g. speculative execution where another attempt
succeeds).
So, the following sequence of events is possible:
# `PythonRunner` is running
# The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
# [`PythonRunner`'s main task thread receives this instruction and releases
the worker for
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
# For a separate reason: Executor decides to kill this task (e.g. speculative
execution)
# [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
already-relinquished
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
So the next task that pulls this Python worker from the idle pool will have a
dead Python worker.
Important notes:
* Prior to SPARK-47565, this would result in a crash (failing the task) as we
would reuse this now-closed worker.
* With SPARK-47565, this is less of an issue, as we check that the worker is
alive before we use it.
was:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
in daemon mode will allow for worker reuse, where possible, as long as the
worker successfully completed its last-assigned task (via
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
The worker will be released into the idle queue to be picked up by the next
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
call.
However, there is a race condition that can result in a released worker in the
[PythonWorkerFactory idle
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
getting killed. i.e. the `PythonRunner` lacks synchronization between:
* the main task thread's decision to release its associated Python worker
(when work is complete), and
* the `MonitorThread`'s decision to kill the associated Python worker (when
requested by the executor, e.g. speculative execution where another attempt
succeeds).
So, the following sequence of events is possible:
# `PythonRunner` is running
# The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
# [`PythonRunner`'s main task thread receives this instruction and releases
the worker for
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
# For a separate reason: Executor decides to kill this task (e.g. speculative
execution)
# [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
already-relinquished
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
> Key: SPARK-54217
> URL: https://issues.apache.org/jira/browse/SPARK-54217
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Steven Tran
> Priority: Major
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
> in daemon mode will allow for worker reuse, where possible, as long as the
> worker successfully completed its last-assigned task (via
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
> The worker will be released into the idle queue to be picked up by the next
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
> call.
>
> However, there is a race condition that can result in a released worker in
> the [PythonWorkerFactory idle
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
> getting killed. i.e. the `PythonRunner` lacks synchronization between:
> * the main task thread's decision to release its associated Python worker
> (when work is complete), and
> * the `MonitorThread`'s decision to kill the associated Python worker (when
> requested by the executor, e.g. speculative execution where another attempt
> succeeds).
> So, the following sequence of events is possible:
> # `PythonRunner` is running
> # The Python worker finishes its work and writes `END_OF_STREAM` to signal
> back to `PythonRunner`'s main task thread that it is done
> # [`PythonRunner`'s main task thread receives this instruction and releases
> the worker for
> reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
> # For a separate reason: Executor decides to kill this task (e.g.
> speculative execution)
> # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the
> already-relinquished
> `PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
>
> So the next task that pulls this Python worker from the idle pool will have a
> dead Python worker.
>
> Important notes:
> * Prior to SPARK-47565, this would result in a crash (failing the task) as we
> would reuse this now-closed worker.
> * With SPARK-47565, this is less of an issue, as we check that the worker is
> alive before we use it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]