This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d66350a2aad [SPARK-38677][PYSPARK][3.2] Python MonitorThread should
detect deadlock due to blocking I/O
d66350a2aad is described below
commit d66350a2aadd7f1e612cc9cf54009ea6f531630e
Author: Ankur Dave <[email protected]>
AuthorDate: Wed Apr 13 16:58:18 2022 +0900
[SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due
to blocking I/O
### What changes were proposed in this pull request?
This PR cherry-picks https://github.com/apache/spark/pull/36065 to
branch-3.2.
---
When calling a Python UDF on a DataFrame with large rows, a deadlock can
occur involving the following three threads:
1. The Scala task executor thread. During task execution, this is
responsible for reading output produced by the Python process. However, in this
case the task has finished early, and this thread is no longer reading output
produced by the Python process. Instead, it is waiting for the Scala
WriterThread to exit so that it can finish the task.
2. The Scala WriterThread. This is trying to send a large row to the Python
process, and is waiting for the Python process to read that row.
3. The Python process. This is trying to send a large output to the Scala
task executor thread, and is waiting for that thread to read that output, which
will never happen.
We considered the following three solutions for the deadlock:
1. When the task completes, make the Scala task executor thread close the
socket before waiting for the Scala WriterThread to exit. If the WriterThread
is blocked on a large write, this would interrupt that write and allow the
WriterThread to exit. However, it would prevent Python worker reuse.
2. Modify PythonWorkerFactory to use interruptible I/O.
[java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer))
supports interruptible blocking operations. The goal is that when the
WriterThread is interrupted, it should exit even if it was blocked on a large
write. However, this would be invasive.
3. Add a watchdog thread similar to the existing PythonRunner.MonitorThread
to detect this deadlock and kill the Python worker. The MonitorThread currently
kills the Python worker only if the task itself is interrupted. In this case,
the task completes normally, so the MonitorThread does not take action. We want
the new watchdog thread (WriterMonitorThread) to detect that the task is
completed but the Python writer thread has not stopped, indicating a deadlock.
This PR implements Option 3.
### Why are the changes needed?
To fix a deadlock that can cause PySpark queries to hang.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a test that previously encountered the deadlock and timed out, and
now succeeds.
Closes #36172 from HyukjinKwon/SPARK-38677-3.2.
Authored-by: Ankur Dave <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/api/python/PythonRunner.scala | 49 ++++++++++++++++++++++
python/pyspark/tests/test_rdd.py | 35 ++++++++++++++++
2 files changed, 84 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 66b23782cf9..fabff970f2b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -181,6 +181,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
writerThread.start()
+ new WriterMonitorThread(SparkEnv.get, worker, writerThread,
context).start()
if (reuseWorker) {
val key = (worker, context.taskAttemptId)
// SPARK-35009: avoid creating multiple monitor threads for the same
python worker
@@ -643,6 +644,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
}
}
+
+ /**
+ * This thread monitors the WriterThread and kills it in case of deadlock.
+ *
+ * A deadlock can arise if the task completes while the writer thread is
sending input to the
+ * Python process (e.g. due to the use of `take()`), and the Python process
is still producing
+ * output. When the inputs are sufficiently large, this can result in a
deadlock due to the use of
+ * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the
socket.
+ */
+ class WriterMonitorThread(
+ env: SparkEnv, worker: Socket, writerThread: WriterThread, context:
TaskContext)
+ extends Thread(s"Writer Monitor for $pythonExec (writer thread id
${writerThread.getId})") {
+
+ /**
+ * How long to wait before closing the socket if the writer thread has not
exited after the task
+ * ends.
+ */
+ private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
+
+ setDaemon(true)
+
+ override def run(): Unit = {
+ // Wait until the task is completed (or the writer thread exits, in
which case this thread has
+ // nothing to do).
+ while (!context.isCompleted && writerThread.isAlive) {
+ Thread.sleep(2000)
+ }
+ if (writerThread.isAlive) {
+ Thread.sleep(taskKillTimeout)
+ // If the writer thread continues running, this indicates a deadlock.
Kill the worker to
+ // resolve the deadlock.
+ if (writerThread.isAlive) {
+ try {
+ // Mimic the task name used in `Executor` to help the user find
out the task to blame.
+ val taskName = s"${context.partitionId}.${context.attemptNumber} "
+
+ s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+ logWarning(
+ s"Detected deadlock while completing task $taskName: " +
+ "Attempting to kill Python Worker")
+ env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+ } catch {
+ case e: Exception =>
+ logError("Exception when trying to kill worker", e)
+ }
+ }
+ }
+ }
+ }
}
private[spark] object PythonRunner {
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index 93d4afd3364..81234a4031a 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -29,6 +29,7 @@ from pyspark.resource import ExecutorResourceRequests,
ResourceProfileBuilder,\
TaskResourceRequests
from pyspark.serializers import CloudPickleSerializer, BatchedSerializer,
PickleSerializer,\
MarshalSerializer, UTF8Deserializer, NoOpSerializer
+from pyspark.sql import SparkSession
from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest
@@ -693,6 +694,40 @@ class RDDTests(ReusedPySparkTestCase):
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
rdd._jrdd.first()
+ def test_take_on_jrdd_with_large_rows_should_not_cause_deadlock(self):
+ # Regression test for SPARK-38677.
+ #
+ # Create a DataFrame with many columns, call a Python function on each
row, and take only
+ # the first result row.
+ #
+ # This produces large rows that trigger a deadlock involving the
following three threads:
+ #
+ # 1. The Scala task executor thread. During task execution, this is
responsible for reading
+ # output produced by the Python process. However, in this case the
task has finished
+ # early, and this thread is no longer reading output produced by
the Python process.
+ # Instead, it is waiting for the Scala WriterThread to exit so that
it can finish the
+ # task.
+ #
+ # 2. The Scala WriterThread. This is trying to send a large row to the
Python process, and
+ # is waiting for the Python process to read that row.
+ #
+ # 3. The Python process. This is trying to send a large output to the
Scala task executor
+ # thread, and is waiting for that thread to read that output.
+ #
+ # For this test to succeed rather than hanging, the Scala
MonitorThread must detect this
+ # deadlock and kill the Python worker.
+ import numpy as np
+ import pandas as pd
+
+ num_rows = 100000
+ num_columns = 134
+ data = np.zeros((num_rows, num_columns))
+ columns = map(str, range(num_columns))
+ df = SparkSession(self.sc).createDataFrame(pd.DataFrame(data,
columns=columns))
+ actual = PickleSerializer().loads(df.rdd.map(list)._jrdd.first())
+ expected = [list(data[0])]
+ self.assertEqual(expected, actual)
+
def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
# Regression test for SPARK-5969
seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]