Repository: spark
Updated Branches:
  refs/heads/master 1e8233541 -> 03f2b7bff


[SPARK-22535][PYSPARK] Sleep before killing the python worker in 
PythonRunner.MonitorThread

## What changes were proposed in this pull request?

`PythonRunner.MonitorThread` should give the task a little time to finish 
before forcibly killing the python worker. This will reduce the chance of the 
race condition a lot. I also improved the log a bit to find out the task to 
blame when it's stuck.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #19762 from zsxwing/SPARK-22535.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03f2b7bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03f2b7bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03f2b7bf

Branch: refs/heads/master
Commit: 03f2b7bff7e537ec747b41ad22e448e1c141f0dd
Parents: 1e82335
Author: Shixiong Zhu <[email protected]>
Authored: Thu Nov 16 14:22:25 2017 +0900
Committer: Takuya UESHIN <[email protected]>
Committed: Thu Nov 16 14:22:25 2017 +0900

----------------------------------------------------------------------
 .../apache/spark/api/python/PythonRunner.scala  | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03f2b7bf/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index d417303..9989f68 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -337,6 +337,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)
     extends Thread(s"Worker Monitor for $pythonExec") {
 
+    /** How long to wait before killing the python worker if a task cannot be 
interrupted. */
+    private val taskKillTimeout = 
env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
+
     setDaemon(true)
 
     override def run() {
@@ -346,12 +349,18 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         Thread.sleep(2000)
       }
       if (!context.isCompleted) {
-        try {
-          logWarning("Incomplete task interrupted: Attempting to kill Python 
Worker")
-          env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
-        } catch {
-          case e: Exception =>
-            logError("Exception when trying to kill worker", e)
+        Thread.sleep(taskKillTimeout)
+        if (!context.isCompleted) {
+          try {
+            // Mimic the task name used in `Executor` to help the user find 
out the task to blame.
+            val taskName = s"${context.partitionId}.${context.taskAttemptId} " 
+
+              s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+            logWarning(s"Incomplete task $taskName interrupted: Attempting to 
kill Python Worker")
+            env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+          } catch {
+            case e: Exception =>
+              logError("Exception when trying to kill worker", e)
+          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to