[ 
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Tran updated SPARK-54217:
--------------------------------
    Description: 
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
 in daemon mode will allow for worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
 The worker will be released into the idle queue to be picked up by the next 
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
 call.

 

However, there is a race condition that can result in a released worker in the 
[PythonWorkerFactory idle 
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
 * the main task thread's decision to release its associated Python worker 
(when work is complete), and
 * the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).

So, the following sequence of events is possible:
 # `PythonRunner` is running
 # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
 # [`PythonRunner`'s main task thread receives this instruction and releases 
the worker for 
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
 # For a separate reason: Executor decides to kill this task (e.g. speculative 
execution)
 # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished 
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]

  was:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
 in daemon mode will allow for worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
 The worker will be released into the idle queue to be picked up by the next 
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
 call.

 

However, there is a race condition that can result in a released worker in the 
[PythonWorkerFactory idle 
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
 * the main task thread's decision to release its associated Python worker 
(when work is complete), and
 * the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).

So, the following sequence of events is possible:
 # `PythonRunner` is running
 # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
 # `PythonRunner`'s main task thread receives this instruction and releases the 
worker for reuse
 # Executor decides to kill this task (e.g. speculative execution)
 # `PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished `PythonWorker`


> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
>                 Key: SPARK-54217
>                 URL: https://issues.apache.org/jira/browse/SPARK-54217
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Steven Tran
>            Priority: Major
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
>  in daemon mode will allow for worker reuse, where possible, as long as the 
> worker successfully completed its last-assigned task (via 
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
>  The worker will be released into the idle queue to be picked up by the next 
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
>  call.
>  
> However, there is a race condition that can result in a released worker in 
> the [PythonWorkerFactory idle 
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
>  getting killed. i.e. the `PythonRunner` lacks synchronization between:
>  * the main task thread's decision to release its associated Python worker 
> (when work is complete), and
>  * the `MonitorThread`'s decision to kill the associated Python worker (when 
> requested by the executor, e.g. speculative execution where another attempt 
> succeeds).
> So, the following sequence of events is possible:
>  # `PythonRunner` is running
>  # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
> back to `PythonRunner`'s main task thread that it is done
>  # [`PythonRunner`'s main task thread receives this instruction and releases 
> the worker for 
> reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
>  # For a separate reason: Executor decides to kill this task (e.g. 
> speculative execution)
>  # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
> already-relinquished 
> `PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to