This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d76d29a0eb3 Revert "[SPARK-38677][PYSPARK] Python MonitorThread should
detect deadlock due to blocking I/O"
d76d29a0eb3 is described below
commit d76d29a0eb3cda3f9fe8100fd9e286080ccd6910
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Apr 13 13:49:51 2022 +0900
Revert "[SPARK-38677][PYSPARK] Python MonitorThread should detect deadlock
due to blocking I/O"
This reverts commit 6bd695e9abc19c57fe34772813c9e61627017349.
---
.../org/apache/spark/api/python/PythonRunner.scala | 49 ----------------------
python/pyspark/tests/test_rdd.py | 35 ----------------
2 files changed, 84 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index fabff970f2b..66b23782cf9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -181,7 +181,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
writerThread.start()
- new WriterMonitorThread(SparkEnv.get, worker, writerThread,
context).start()
if (reuseWorker) {
val key = (worker, context.taskAttemptId)
// SPARK-35009: avoid creating multiple monitor threads for the same
python worker
@@ -644,54 +643,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
}
}
-
- /**
- * This thread monitors the WriterThread and kills it in case of deadlock.
- *
- * A deadlock can arise if the task completes while the writer thread is
sending input to the
- * Python process (e.g. due to the use of `take()`), and the Python process
is still producing
- * output. When the inputs are sufficiently large, this can result in a
deadlock due to the use of
- * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the
socket.
- */
- class WriterMonitorThread(
- env: SparkEnv, worker: Socket, writerThread: WriterThread, context:
TaskContext)
- extends Thread(s"Writer Monitor for $pythonExec (writer thread id
${writerThread.getId})") {
-
- /**
- * How long to wait before closing the socket if the writer thread has not
exited after the task
- * ends.
- */
- private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
-
- setDaemon(true)
-
- override def run(): Unit = {
- // Wait until the task is completed (or the writer thread exits, in
which case this thread has
- // nothing to do).
- while (!context.isCompleted && writerThread.isAlive) {
- Thread.sleep(2000)
- }
- if (writerThread.isAlive) {
- Thread.sleep(taskKillTimeout)
- // If the writer thread continues running, this indicates a deadlock.
Kill the worker to
- // resolve the deadlock.
- if (writerThread.isAlive) {
- try {
- // Mimic the task name used in `Executor` to help the user find
out the task to blame.
- val taskName = s"${context.partitionId}.${context.attemptNumber} "
+
- s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
- logWarning(
- s"Detected deadlock while completing task $taskName: " +
- "Attempting to kill Python Worker")
- env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
- } catch {
- case e: Exception =>
- logError("Exception when trying to kill worker", e)
- }
- }
- }
- }
- }
}
private[spark] object PythonRunner {
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index 9a023a16a13..93d4afd3364 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -29,7 +29,6 @@ from pyspark.resource import ExecutorResourceRequests,
ResourceProfileBuilder,\
TaskResourceRequests
from pyspark.serializers import CloudPickleSerializer, BatchedSerializer,
PickleSerializer,\
MarshalSerializer, UTF8Deserializer, NoOpSerializer
-from pyspark.sql import SparkSession
from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest
@@ -694,40 +693,6 @@ class RDDTests(ReusedPySparkTestCase):
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
rdd._jrdd.first()
- def test_take_on_jrdd_with_large_rows_should_not_cause_deadlock(self):
- # Regression test for SPARK-38677.
- #
- # Create a DataFrame with many columns, call a Python function on each
row, and take only
- # the first result row.
- #
- # This produces large rows that trigger a deadlock involving the
following three threads:
- #
- # 1. The Scala task executor thread. During task execution, this is
responsible for reading
- # output produced by the Python process. However, in this case the
task has finished
- # early, and this thread is no longer reading output produced by
the Python process.
- # Instead, it is waiting for the Scala WriterThread to exit so that
it can finish the
- # task.
- #
- # 2. The Scala WriterThread. This is trying to send a large row to the
Python process, and
- # is waiting for the Python process to read that row.
- #
- # 3. The Python process. This is trying to send a large output to the
Scala task executor
- # thread, and is waiting for that thread to read that output.
- #
- # For this test to succeed rather than hanging, the Scala
MonitorThread must detect this
- # deadlock and kill the Python worker.
- import numpy as np
- import pandas as pd
-
- num_rows = 100000
- num_columns = 134
- data = np.zeros((num_rows, num_columns))
- columns = map(str, range(num_columns))
- df = SparkSession(self.sc).createDataFrame(pd.DataFrame(data,
columns=columns))
- actual = CPickleSerializer().loads(df.rdd.map(list)._jrdd.first())
- expected = [list(data[0])]
- self.assertEqual(expected, actual)
-
def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
# Regression test for SPARK-5969
seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]