[ 
https://issues.apache.org/jira/browse/SPARK-54217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Tran updated SPARK-54217:
--------------------------------
    Description: 
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
 in daemon mode will allow for worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
 The worker will be released into the idle queue to be picked up by the next 
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
 call.

 

However, there is a race condition that can result in a released worker in the 
[PythonWorkerFactory idle 
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
 * the main task thread's decision to release its associated Python worker 
(when work is complete), and
 * the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).

So, the following sequence of events is possible:
 # `PythonRunner` is running
 # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
 # [`PythonRunner`'s main task thread receives this instruction and releases 
the worker for 
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
 # For a separate reason: Executor decides to kill this task (e.g. speculative 
execution)
 # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished 
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]

 

So the next task that pulls this Python worker from the idle pool will have a 
dead Python worker.
 
Important notes:
* Prior to SPARK-47565, this would result in a crash (failing the task) as we 
would reuse this now-closed worker.
* With SPARK-47565, this is less of an issue, as we check that the worker is 
alive before we use it.

  was:
[PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
 in daemon mode will allow for worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
 The worker will be released into the idle queue to be picked up by the next 
[createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
 call.

 

However, there is a race condition that can result in a released worker in the 
[PythonWorkerFactory idle 
queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
 * the main task thread's decision to release its associated Python worker 
(when work is complete), and
 * the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).

So, the following sequence of events is possible:
 # `PythonRunner` is running
 # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
 # [`PythonRunner`'s main task thread receives this instruction and releases 
the worker for 
reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
 # For a separate reason: Executor decides to kill this task (e.g. speculative 
execution)
 # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished 
`PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]


> PythonRunner does not synchronize Python worker kill/release decisions
> ----------------------------------------------------------------------
>
>                 Key: SPARK-54217
>                 URL: https://issues.apache.org/jira/browse/SPARK-54217
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Steven Tran
>            Priority: Major
>
> [PythonWorkerFactory|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala]
>  in daemon mode will allow for worker reuse, where possible, as long as the 
> worker successfully completed its last-assigned task (via 
> [releasePythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650]).
>  The worker will be released into the idle queue to be picked up by the next 
> [createPythonWorker|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302]
>  call.
>  
> However, there is a race condition that can result in a released worker in 
> the [PythonWorkerFactory idle 
> queue|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116]
>  getting killed. i.e. the `PythonRunner` lacks synchronization between:
>  * the main task thread's decision to release its associated Python worker 
> (when work is complete), and
>  * the `MonitorThread`'s decision to kill the associated Python worker (when 
> requested by the executor, e.g. speculative execution where another attempt 
> succeeds).
> So, the following sequence of events is possible:
>  # `PythonRunner` is running
>  # The Python worker finishes its work and writes `END_OF_STREAM` to signal 
> back to `PythonRunner`'s main task thread that it is done
>  # [`PythonRunner`'s main task thread receives this instruction and releases 
> the worker for 
> reuse|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647]
>  # For a separate reason: Executor decides to kill this task (e.g. 
> speculative execution)
>  # [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
> already-relinquished 
> `PythonWorker`|https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696]
>  
> So the next task that pulls this Python worker from the idle pool will have a 
> dead Python worker.
>  
> Important notes:
> * Prior to SPARK-47565, this would result in a crash (failing the task) as we 
> would reuse this now-closed worker.
> * With SPARK-47565, this is less of an issue, as we check that the worker is 
> alive before we use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to