This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d66350a2aad [SPARK-38677][PYSPARK][3.2] Python MonitorThread should 
detect deadlock due to blocking I/O
d66350a2aad is described below

commit d66350a2aadd7f1e612cc9cf54009ea6f531630e
Author: Ankur Dave <[email protected]>
AuthorDate: Wed Apr 13 16:58:18 2022 +0900

    [SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due 
to blocking I/O
    
    ### What changes were proposed in this pull request?
    
    This PR cherry-picks https://github.com/apache/spark/pull/36065 to 
branch-3.2.
    
    ---
    
    When calling a Python UDF on a DataFrame with large rows, a deadlock can 
occur involving the following three threads:
    
    1. The Scala task executor thread. During task execution, this is 
responsible for reading output produced by the Python process. However, in this 
case the task has finished early, and this thread is no longer reading output 
produced by the Python process. Instead, it is waiting for the Scala 
WriterThread to exit so that it can finish the task.
    2. The Scala WriterThread. This is trying to send a large row to the Python 
process, and is waiting for the Python process to read that row.
    3. The Python process. This is trying to send a large output to the Scala 
task executor thread, and is waiting for that thread to read that output, which 
will never happen.
    
    We considered the following three solutions for the deadlock:
    
    1. When the task completes, make the Scala task executor thread close the 
socket before waiting for the Scala WriterThread to exit. If the WriterThread 
is blocked on a large write, this would interrupt that write and allow the 
WriterThread to exit. However, it would prevent Python worker reuse.
    2. Modify PythonWorkerFactory to use interruptible I/O. 
[java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer))
 supports interruptible blocking operations. The goal is that when the 
WriterThread is interrupted, it should exit even if it was blocked on a large 
write. However, this would be invasive.
    3. Add a watchdog thread similar to the existing PythonRunner.MonitorThread 
to detect this deadlock and kill the Python worker. The MonitorThread currently 
kills the Python worker only if the task itself is interrupted. In this case, 
the task completes normally, so the MonitorThread does not take action. We want 
the new watchdog thread (WriterMonitorThread) to detect that the task is 
completed but the Python writer thread has not stopped, indicating a deadlock.
    
    This PR implements Option 3.
    
    ### Why are the changes needed?
    To fix a deadlock that can cause PySpark queries to hang.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added a test that previously encountered the deadlock and timed out, and 
now succeeds.
    
    Closes #36172 from HyukjinKwon/SPARK-38677-3.2.
    
    Authored-by: Ankur Dave <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/api/python/PythonRunner.scala | 49 ++++++++++++++++++++++
 python/pyspark/tests/test_rdd.py                   | 35 ++++++++++++++++
 2 files changed, 84 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 66b23782cf9..fabff970f2b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -181,6 +181,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     }
 
     writerThread.start()
+    new WriterMonitorThread(SparkEnv.get, worker, writerThread, 
context).start()
     if (reuseWorker) {
       val key = (worker, context.taskAttemptId)
       // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
@@ -643,6 +644,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       }
     }
   }
+
+  /**
+   * This thread monitors the WriterThread and kills it in case of deadlock.
+   *
+   * A deadlock can arise if the task completes while the writer thread is 
sending input to the
+   * Python process (e.g. due to the use of `take()`), and the Python process 
is still producing
+   * output. When the inputs are sufficiently large, this can result in a 
deadlock due to the use of
+   * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the 
socket.
+   */
+  class WriterMonitorThread(
+      env: SparkEnv, worker: Socket, writerThread: WriterThread, context: 
TaskContext)
+    extends Thread(s"Writer Monitor for $pythonExec (writer thread id 
${writerThread.getId})") {
+
+    /**
+     * How long to wait before closing the socket if the writer thread has not 
exited after the task
+     * ends.
+     */
+    private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
+
+    setDaemon(true)
+
+    override def run(): Unit = {
+      // Wait until the task is completed (or the writer thread exits, in 
which case this thread has
+      // nothing to do).
+      while (!context.isCompleted && writerThread.isAlive) {
+        Thread.sleep(2000)
+      }
+      if (writerThread.isAlive) {
+        Thread.sleep(taskKillTimeout)
+        // If the writer thread continues running, this indicates a deadlock. 
Kill the worker to
+        // resolve the deadlock.
+        if (writerThread.isAlive) {
+          try {
+            // Mimic the task name used in `Executor` to help the user find 
out the task to blame.
+            val taskName = s"${context.partitionId}.${context.attemptNumber} " 
+
+              s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+            logWarning(
+              s"Detected deadlock while completing task $taskName: " +
+                "Attempting to kill Python Worker")
+            env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+          } catch {
+            case e: Exception =>
+              logError("Exception when trying to kill worker", e)
+          }
+        }
+      }
+    }
+  }
 }
 
 private[spark] object PythonRunner {
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index 93d4afd3364..81234a4031a 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -29,6 +29,7 @@ from pyspark.resource import ExecutorResourceRequests, 
ResourceProfileBuilder,\
     TaskResourceRequests
 from pyspark.serializers import CloudPickleSerializer, BatchedSerializer, 
PickleSerializer,\
     MarshalSerializer, UTF8Deserializer, NoOpSerializer
+from pyspark.sql import SparkSession
 from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest
 
 
@@ -693,6 +694,40 @@ class RDDTests(ReusedPySparkTestCase):
         rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
         rdd._jrdd.first()
 
+    def test_take_on_jrdd_with_large_rows_should_not_cause_deadlock(self):
+        # Regression test for SPARK-38677.
+        #
+        # Create a DataFrame with many columns, call a Python function on each 
row, and take only
+        # the first result row.
+        #
+        # This produces large rows that trigger a deadlock involving the 
following three threads:
+        #
+        # 1. The Scala task executor thread. During task execution, this is 
responsible for reading
+        #    output produced by the Python process. However, in this case the 
task has finished
+        #    early, and this thread is no longer reading output produced by 
the Python process.
+        #    Instead, it is waiting for the Scala WriterThread to exit so that 
it can finish the
+        #    task.
+        #
+        # 2. The Scala WriterThread. This is trying to send a large row to the 
Python process, and
+        #    is waiting for the Python process to read that row.
+        #
+        # 3. The Python process. This is trying to send a large output to the 
Scala task executor
+        #    thread, and is waiting for that thread to read that output.
+        #
+        # For this test to succeed rather than hanging, the Scala 
MonitorThread must detect this
+        # deadlock and kill the Python worker.
+        import numpy as np
+        import pandas as pd
+
+        num_rows = 100000
+        num_columns = 134
+        data = np.zeros((num_rows, num_columns))
+        columns = map(str, range(num_columns))
+        df = SparkSession(self.sc).createDataFrame(pd.DataFrame(data, 
columns=columns))
+        actual = PickleSerializer().loads(df.rdd.map(list)._jrdd.first())
+        expected = [list(data[0])]
+        self.assertEqual(expected, actual)
+
     def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
         # Regression test for SPARK-5969
         seq = [(i * 59 % 101, i) for i in range(101)]  # unsorted sequence


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to