This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d76d29a0eb3 Revert "[SPARK-38677][PYSPARK] Python MonitorThread should 
detect deadlock due to blocking I/O"
d76d29a0eb3 is described below

commit d76d29a0eb3cda3f9fe8100fd9e286080ccd6910
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Apr 13 13:49:51 2022 +0900

    Revert "[SPARK-38677][PYSPARK] Python MonitorThread should detect deadlock 
due to blocking I/O"
    
    This reverts commit 6bd695e9abc19c57fe34772813c9e61627017349.
---
 .../org/apache/spark/api/python/PythonRunner.scala | 49 ----------------------
 python/pyspark/tests/test_rdd.py                   | 35 ----------------
 2 files changed, 84 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index fabff970f2b..66b23782cf9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -181,7 +181,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     }
 
     writerThread.start()
-    new WriterMonitorThread(SparkEnv.get, worker, writerThread, 
context).start()
     if (reuseWorker) {
       val key = (worker, context.taskAttemptId)
       // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
@@ -644,54 +643,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       }
     }
   }
-
-  /**
-   * This thread monitors the WriterThread and kills it in case of deadlock.
-   *
-   * A deadlock can arise if the task completes while the writer thread is 
sending input to the
-   * Python process (e.g. due to the use of `take()`), and the Python process 
is still producing
-   * output. When the inputs are sufficiently large, this can result in a 
deadlock due to the use of
-   * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the 
socket.
-   */
-  class WriterMonitorThread(
-      env: SparkEnv, worker: Socket, writerThread: WriterThread, context: 
TaskContext)
-    extends Thread(s"Writer Monitor for $pythonExec (writer thread id 
${writerThread.getId})") {
-
-    /**
-     * How long to wait before closing the socket if the writer thread has not 
exited after the task
-     * ends.
-     */
-    private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
-
-    setDaemon(true)
-
-    override def run(): Unit = {
-      // Wait until the task is completed (or the writer thread exits, in 
which case this thread has
-      // nothing to do).
-      while (!context.isCompleted && writerThread.isAlive) {
-        Thread.sleep(2000)
-      }
-      if (writerThread.isAlive) {
-        Thread.sleep(taskKillTimeout)
-        // If the writer thread continues running, this indicates a deadlock. 
Kill the worker to
-        // resolve the deadlock.
-        if (writerThread.isAlive) {
-          try {
-            // Mimic the task name used in `Executor` to help the user find 
out the task to blame.
-            val taskName = s"${context.partitionId}.${context.attemptNumber} " 
+
-              s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
-            logWarning(
-              s"Detected deadlock while completing task $taskName: " +
-                "Attempting to kill Python Worker")
-            env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
-          } catch {
-            case e: Exception =>
-              logError("Exception when trying to kill worker", e)
-          }
-        }
-      }
-    }
-  }
 }
 
 private[spark] object PythonRunner {
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index 9a023a16a13..93d4afd3364 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -29,7 +29,6 @@ from pyspark.resource import ExecutorResourceRequests, 
ResourceProfileBuilder,\
     TaskResourceRequests
 from pyspark.serializers import CloudPickleSerializer, BatchedSerializer, 
PickleSerializer,\
     MarshalSerializer, UTF8Deserializer, NoOpSerializer
-from pyspark.sql import SparkSession
 from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest
 
 
@@ -694,40 +693,6 @@ class RDDTests(ReusedPySparkTestCase):
         rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
         rdd._jrdd.first()
 
-    def test_take_on_jrdd_with_large_rows_should_not_cause_deadlock(self):
-        # Regression test for SPARK-38677.
-        #
-        # Create a DataFrame with many columns, call a Python function on each 
row, and take only
-        # the first result row.
-        #
-        # This produces large rows that trigger a deadlock involving the 
following three threads:
-        #
-        # 1. The Scala task executor thread. During task execution, this is 
responsible for reading
-        #    output produced by the Python process. However, in this case the 
task has finished
-        #    early, and this thread is no longer reading output produced by 
the Python process.
-        #    Instead, it is waiting for the Scala WriterThread to exit so that 
it can finish the
-        #    task.
-        #
-        # 2. The Scala WriterThread. This is trying to send a large row to the 
Python process, and
-        #    is waiting for the Python process to read that row.
-        #
-        # 3. The Python process. This is trying to send a large output to the 
Scala task executor
-        #    thread, and is waiting for that thread to read that output.
-        #
-        # For this test to succeed rather than hanging, the Scala 
MonitorThread must detect this
-        # deadlock and kill the Python worker.
-        import numpy as np
-        import pandas as pd
-
-        num_rows = 100000
-        num_columns = 134
-        data = np.zeros((num_rows, num_columns))
-        columns = map(str, range(num_columns))
-        df = SparkSession(self.sc).createDataFrame(pd.DataFrame(data, 
columns=columns))
-        actual = CPickleSerializer().loads(df.rdd.map(list)._jrdd.first())
-        expected = [list(data[0])]
-        self.assertEqual(expected, actual)
-
     def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
         # Regression test for SPARK-5969
         seq = [(i * 59 % 101, i) for i in range(101)]  # unsorted sequence


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to