Repository: spark
Updated Branches:
  refs/heads/branch-1.0 2250c7acb -> a1f8779a7


SPARK-1104: kill Process in workerThread of ExecutorRunner

As reported in https://spark-project.atlassian.net/browse/SPARK-1104

By @pwendell: "Sometimes due to large shuffles executors will take a long time 
shutting down. In particular this can happen if large numbers of shuffle files 
are around (this will be alleviated by SPARK-1103, but nonetheless...).

The symptom is you have DEAD workers sitting around in the UI and the existing 
workers keep trying to re-register but can't because they've been assumed dead."

In this patch, I add lines in the handler of InterruptedException in 
workerThread of executorRunner, so that the process.destroy() and 
process.waitFor() can only block the workerThread instead of blocking the 
worker Actor...

---------

analysis: process.destroy() is a blocking method, i.e. it only returns when all 
shutdownHook threads return...so calling it in Worker thread will make Worker 
block for a long while....

about what will happen on the shutdown hooks when the JVM process is killed: 
http://www.tutorialspoint.com/java/lang/runtime_addshutdownhook.htm

Author: CodingCat <[email protected]>

Closes #35 from CodingCat/SPARK-1104 and squashes the following commits:

85767da [CodingCat] add null checking and remove unnecessary killProce
3107aeb [CodingCat] address Aaron's comments
eb615ba [CodingCat] kill the process when the error happens
0accf2f [CodingCat] set process to null after killed it
1d511c8 [CodingCat] kill Process in workerThread

(cherry picked from commit f99af8529b6969986f0c3e03f6ff9b7bb9d53ece)
Signed-off-by: Aaron Davidson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1f8779a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1f8779a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1f8779a

Branch: refs/heads/branch-1.0
Commit: a1f8779a7c461e6fe0b6cb6d57be7479afcc2c82
Parents: 2250c7a
Author: CodingCat <[email protected]>
Authored: Thu Apr 24 15:55:18 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Thu Apr 24 15:56:15 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/worker/ExecutorRunner.scala    | 31 +++++++++-----------
 1 file changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1f8779a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f94cd68..2051403 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -58,30 +58,29 @@ private[spark] class ExecutorRunner(
       override def run() { fetchAndRunExecutor() }
     }
     workerThread.start()
-
     // Shutdown hook that kills actors on shutdown.
     shutdownHook = new Thread() {
       override def run() {
-        if (process != null) {
-          logInfo("Shutdown hook killing child process.")
-          process.destroy()
-          process.waitFor()
-        }
+        killProcess()
       }
     }
     Runtime.getRuntime.addShutdownHook(shutdownHook)
   }
 
+  private def killProcess() {
+    if (process != null) {
+      logInfo("Killing process!")
+      process.destroy()
+      process.waitFor()
+    }
+  }
+
   /** Stop this executor runner, including killing the process it launched */
   def kill() {
     if (workerThread != null) {
+      // the workerThread will kill the child process when interrupted
       workerThread.interrupt()
       workerThread = null
-      if (process != null) {
-        logInfo("Killing process!")
-        process.destroy()
-        process.waitFor()
-      }
       state = ExecutorState.KILLED
       worker ! ExecutorStateChanged(appId, execId, state, None, None)
       Runtime.getRuntime.removeShutdownHook(shutdownHook)
@@ -128,7 +127,6 @@ private[spark] class ExecutorRunner(
       // parent process for the executor command
       env.put("SPARK_LAUNCH_WITH_SCALA", "0")
       process = builder.start()
-
       val header = "Spark Executor Command: %s\n%s\n\n".format(
         command.mkString("\"", "\" \"", "\""), "=" * 40)
 
@@ -148,14 +146,13 @@ private[spark] class ExecutorRunner(
       val message = "Command exited with code " + exitCode
       worker ! ExecutorStateChanged(appId, execId, state, Some(message), 
Some(exitCode))
     } catch {
-      case interrupted: InterruptedException =>
+      case interrupted: InterruptedException => {
         logInfo("Runner thread for executor " + fullId + " interrupted")
-
+        killProcess()
+      }
       case e: Exception => {
         logError("Error running executor", e)
-        if (process != null) {
-          process.destroy()
-        }
+        killProcess()
         state = ExecutorState.FAILED
         val message = e.getClass + ": " + e.getMessage
         worker ! ExecutorStateChanged(appId, execId, state, Some(message), 
None)

Reply via email to