[ 
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Tran updated SPARK-54217:
--------------------------------
    Description: 
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
 in daemon mode will facilitate worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
 The worker will be released into the idle queue to be picked up by the next 
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
 call.

 

However, there is a race condition that can result in a released worker in the 
[PythonWorkerFactory idle 
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
 * the main task thread's decision to release its associated Python worker 
(when work is complete), and
 * the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).

So, the following sequence of events is possible:
 # `PythonRunner` is running
 # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
 # `PythonRunner`'s main task thread receives this instruction and releases the 
worker for reuse
 # Executor decides to kill this task (e.g. speculative execution)
 # `PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished `PythonWorker`

  was:
`PythonWorkerFactory` in daemon mode will facilitate worker reuse, where 
possible, as long as the worker successfully completed its last-assigned task 
(via `releasePythonWorker`). The worker will be released into the idle queue to 
be picked up by the next `createPythonWorker` call.

 

However, there is a race condition that can result in a released worker in 
the`PythonWorkerFactory` idle queue getting killed. i.e. the `PythonRunner` 
lacks synchronization between:
 * the main task thread's decision to release its associated Python worker 
(when work is complete), and
 * the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).

So, the following sequence of events is possible:
 # `PythonRunner` is running
 # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
 # `PythonRunner`'s main task thread receives this instruction and releases the 
worker for reuse
 # Executor decides to kill this task (e.g. speculative execution)
 # `PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished `PythonWorker`


> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
>                 Key: SPARK-54217
>                 URL: https://issues.apache.org/jira/browse/SPARK-54217
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Steven Tran
>            Priority: Major
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
>  in daemon mode will facilitate worker reuse, where possible, as long as the 
> worker successfully completed its last-assigned task (via 
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
>  The worker will be released into the idle queue to be picked up by the next 
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
>  call.
>  
> However, there is a race condition that can result in a released worker in 
> the [PythonWorkerFactory idle 
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
>  getting killed. i.e. the `PythonRunner` lacks synchronization between:
>  * the main task thread's decision to release its associated Python worker 
> (when work is complete), and
>  * the `MonitorThread`'s decision to kill the associated Python worker (when 
> requested by the executor, e.g. speculative execution where another attempt 
> succeeds).
> So, the following sequence of events is possible:
>  # `PythonRunner` is running
>  # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
> back to `PythonRunner`'s main task thread that it is done
>  # `PythonRunner`'s main task thread receives this instruction and releases 
> the worker for reuse
>  # Executor decides to kill this task (e.g. speculative execution)
>  # `PythonRunner`'s `MonitorThread` receives this instruction and kills the 
> already-relinquished `PythonWorker`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to