This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e3bc281c58d3 [SPARK-54217][PYTHON] Synchronize PythonRunner's
MonitorThread kill decision
e3bc281c58d3 is described below
commit e3bc281c58d3cd0f462c5b838632f2efd32f9e86
Author: st-tran <[email protected]>
AuthorDate: Thu Jan 22 07:16:49 2026 +0900
[SPARK-54217][PYTHON] Synchronize PythonRunner's MonitorThread kill decision
### What changes were proposed in this pull request?
This diff addresses the synchronization issue described in SPARK-54217 by
respecting the existing releasedOrClosed AtomicBoolean in the PythonRunner's
kill codepath, which is currently only used in the "released" codepath - not
the "closed" one. In doing so, we avoid erroneously destroying a still-healthy
Python worker; in the current state, it will be destroyed & a new one will be
created.
[Jira ticket description
follows...](https://issues.apache.org/jira/browse/SPARK-54217)
[PythonWorkerFactory](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala)
in daemon mode will allow for worker reuse, where possible, as long as the
worker successfully completed its last-assigned task (via
[releasePythonWorker](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650)).
The worker wi [...]
However, there is a race condition that can result in a released worker in
the [PythonWorkerFactory idle
queue](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116)
getting killed. i.e. the `PythonRunner` lacks synchronization between:
1. the main task thread's decision to release its associated Python worker
(when work is complete), and
2. the `MonitorThread`'s decision to kill the associated Python worker
(when requested by the executor, e.g. speculative execution where another
attempt succeeds).
So, the following sequence of events is possible:
1. `PythonRunner` is running
2. The Python worker finishes its work and writes `END_OF_STREAM` to signal
back to `PythonRunner`'s main task thread that it is done
3. [`PythonRunner`'s main task thread receives this instruction and
releases the worker for
reuse](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647)
4. For a separate reason: Executor decides to kill this task (e.g.
speculative execution)
5. [`PythonRunner`'s `MonitorThread` receives this instruction and kills
the already-relinquished
`PythonWorker`](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696)
So the next task that pulls this Python worker from the idle pool will have
a dead Python worker.
### Why are the changes needed?
In the latest Spark release, this change is NOT critical, however, it
avoids the unnecessary killing of a still-healthy Python worker which results
in another one being created.
* Prior to
[SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), this would
result in a crash (failing the task) as we would reuse this now-closed worker.
* With [SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565),
this is less of an issue, as we check that the worker is alive before we use it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
It is not possible (or very hard) to unit test this change.
However, I've created a minimal repro of the issue. The error occurs when
running without this change, and goes away with this change.
* Without this change: details in SPARK-54217; we see the following log
line indicating that we are attempting to reuse killed Python workers:
* `PythonWorker(java.nio.channels.SocketChannel[closed]) process from
idle queue is dead, discarding.`
* With this change:
* First, cherry-pick [this PR](https://github.com/st-tran/spark/pull/1)
onto this fix.
* Then, rerun the job with this new build and we see that the errors go
away:
```
sttranStevens-MacBook-Pro ~/sttran-spark/work
% ls -lathr
total 0
drwxr-xr-x 18 sttran staff 576B Nov 5 21:22 app-20251105212207-0012 #
Without the fix (has errors shown in https://github.com/st-tran/spark/pull/1)
drwxr-xr-x 53 sttran staff 1.7K Nov 5 22:22 ..
drwxr-xr-x 4 sttran staff 128B Nov 5 22:29 .
drwxr-xr-x 18 sttran staff 576B Nov 5 22:29 app-20251105222956-0013 #
With the fix (no errors per search below)
sttranStevens-MacBook-Pro ~/sttran-spark/work
% grep -nri discard *0013
sttranStevens-MacBook-Pro ~/sttran-spark/work
%
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52915 from st-tran/sttran/synchronize-pythonrunner-release-destroy.
Authored-by: st-tran <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/api/python/PythonRunner.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 5f37e1f67b12..bc0e9f2beb2f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -350,10 +350,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
// SPARK-35009: avoid creating multiple monitor threads for the same
python worker
// and task context
if (PythonRunner.runningMonitorThreads.add(key)) {
- new MonitorThread(SparkEnv.get, worker, context).start()
+ new MonitorThread(SparkEnv.get, worker, context,
releasedOrClosed).start()
}
} else {
- new MonitorThread(SparkEnv.get, worker, context).start()
+ new MonitorThread(SparkEnv.get, worker, context,
releasedOrClosed).start()
}
// Return an iterator that read lines from the process's stdout
@@ -725,7 +725,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
* interrupts disabled. In that case we will need to explicitly kill the
worker, otherwise the
* threads can block indefinitely.
*/
- class MonitorThread(env: SparkEnv, worker: PythonWorker, context:
TaskContext)
+ class MonitorThread(
+ env: SparkEnv,
+ worker: PythonWorker,
+ context: TaskContext,
+ releasedOrClosed: AtomicBoolean)
extends Thread(s"Worker Monitor for $pythonExec") {
/** How long to wait before killing the python worker if a task cannot be
interrupted. */
@@ -741,7 +745,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
if (!context.isCompleted()) {
Thread.sleep(taskKillTimeout)
- if (!context.isCompleted()) {
+ if (!context.isCompleted() && releasedOrClosed.compareAndSet(false,
true)) {
try {
logWarning(log"Incomplete task interrupted: Attempting to kill
Python Worker - " +
log"${MDC(TASK_NAME, taskIdentifier(context))}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]