[
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Steven Tran updated SPARK-54217:
--------------------------------
Description:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
in daemon mode will facilitate worker reuse, where possible, as long as the
worker successfully completed its last-assigned task (via
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
The worker will be released into the idle queue to be picked up by the next
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
call.
However, there is a race condition that can result in a released worker in the
[PythonWorkerFactory idle
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
getting killed. i.e. the `PythonRunner` lacks synchronization between:
* the main task thread's decision to release its associated Python worker
(when work is complete), and
* the `MonitorThread`'s decision to kill the associated Python worker (when
requested by the executor, e.g. speculative execution where another attempt
succeeds).
So, the following sequence of events is possible:
# `PythonRunner` is running
# The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
# `PythonRunner`'s main task thread receives this instruction and releases the
worker for reuse
# Executor decides to kill this task (e.g. speculative execution)
# `PythonRunner`'s `MonitorThread` receives this instruction and kills the
already-relinquished `PythonWorker`
was:
`PythonWorkerFactory` in daemon mode will facilitate worker reuse, where
possible, as long as the worker successfully completed its last-assigned task
(via `releasePythonWorker`). The worker will be released into the idle queue to
be picked up by the next `createPythonWorker` call.
However, there is a race condition that can result in a released worker in
the`PythonWorkerFactory` idle queue getting killed. i.e. the `PythonRunner`
lacks synchronization between:
* the main task thread's decision to release its associated Python worker
(when work is complete), and
* the `MonitorThread`'s decision to kill the associated Python worker (when
requested by the executor, e.g. speculative execution where another attempt
succeeds).
So, the following sequence of events is possible:
# `PythonRunner` is running
# The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
# `PythonRunner`'s main task thread receives this instruction and releases the
worker for reuse
# Executor decides to kill this task (e.g. speculative execution)
# `PythonRunner`'s `MonitorThread` receives this instruction and kills the
already-relinquished `PythonWorker`
> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
> Key: SPARK-54217
> URL: https://issues.apache.org/jira/browse/SPARK-54217
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.2.0
> Reporter: Steven Tran
> Priority: Major
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
> in daemon mode will facilitate worker reuse, where possible, as long as the
> worker successfully completed its last-assigned task (via
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
> The worker will be released into the idle queue to be picked up by the next
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
> call.
>
> However, there is a race condition that can result in a released worker in
> the [PythonWorkerFactory idle
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
> getting killed. i.e. the `PythonRunner` lacks synchronization between:
> * the main task thread's decision to release its associated Python worker
> (when work is complete), and
> * the `MonitorThread`'s decision to kill the associated Python worker (when
> requested by the executor, e.g. speculative execution where another attempt
> succeeds).
> So, the following sequence of events is possible:
> # `PythonRunner` is running
> # The Python worker finishes its work and writes `END_OF_STREAM` to signal
> back to `PythonRunner`'s main task thread that it is done
> # `PythonRunner`'s main task thread receives this instruction and releases
> the worker for reuse
> # Executor decides to kill this task (e.g. speculative execution)
> # `PythonRunner`'s `MonitorThread` receives this instruction and kills the
> already-relinquished `PythonWorker`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]