Repository: spark
Updated Branches:
  refs/heads/master 269fc62b2 -> ca5d9d43b


[SPARK-937] adding EXITED executor state and not relaunching cleanly exited 
executors

There seems to be 2 issues.

1. When job is done, driver asks executor to shutdown. However, this clean exit 
was assigned FAILED executor state by Worker. I introduced EXITED executor 
state for executors who voluntarily exit (both normal and abnormal exit 
depending on the exit code).

2. When Master gets notified an executor has exited, it launches another one to 
replace it, regardless of reason why the executor had exited. When the reason 
was job has finished, the unnecessary replacement got subsequently killed when 
App disassociates. This launching and killing of unnecessary executors shows up 
in the log and is confusing to users. I added check for executor exit status 
and avoid launching (and subsequent killing) of unnecessary replacements when 
executors exit cleanly.

One could ask the scheduler to tell Master job is done so that Master wouldn't 
launch the replacement executor. However, there is a race condition between App 
telling Master job is done and Worker telling Master an executor had exited. 
There is no guarantee the former will happen before the later. Instead, I chose 
to check the exit code when executor exits. If the exit code is 0, I assume 
executor has been asked to shutdown by driver and Master will not launch 
replacements.

Due to race condition, it could also happen that (although didn't happen on my 
local cluster), Master detects App disassociation event before the executor 
exits by itself. In such cases, the executor will be rightfully killed and 
labeled as KILLED, while the App state will show FINISHED.

Author: Kan Zhang <[email protected]>

Closes #306 from kanzhang/SPARK-1118 and squashes the following commits:

cb0cc86 [Kan Zhang] [SPARK-937] adding EXITED executor state and not 
relaunching cleanly exited executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca5d9d43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca5d9d43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca5d9d43

Branch: refs/heads/master
Commit: ca5d9d43b93abd279079b3be8a06fdd78c595510
Parents: 269fc62
Author: Kan Zhang <[email protected]>
Authored: Sun Jun 15 14:55:34 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Sun Jun 15 14:55:34 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/deploy/ExecutorState.scala    | 4 ++--
 .../main/scala/org/apache/spark/deploy/master/Master.scala    | 5 +++--
 .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 7 +++----
 3 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca5d9d43/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index 37dfa7f..9f34d01 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
 
 private[spark] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
+  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 
-  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, 
LOST).contains(state)
+  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, 
EXITED).contains(state)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca5d9d43/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c6dec30..33ffcbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -303,10 +303,11 @@ private[spark] class Master(
             appInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
 
+            val normalExit = exitStatus.exists(_ == 0)
             // Only retry certain number of times so we don't go into an 
infinite loop.
-            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
+            if (!normalExit && appInfo.incrementRetryCount < 
ApplicationState.MAX_NUM_RETRY) {
               schedule()
-            } else {
+            } else if (!normalExit) {
               logError("Application %s with ID %s failed %d times, removing 
it".format(
                 appInfo.desc.name, appInfo.id, appInfo.retryCount))
               removeApplication(appInfo, ApplicationState.FAILED)

http://git-wip-us.apache.org/repos/asf/spark/blob/ca5d9d43/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d09136d..6433aac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
       Files.write(header, stderr, Charsets.UTF_8)
       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
-      // Wait for it to exit; this is actually a bad thing if it happens, 
because we expect to run
-      // long-lived processes only. However, in the future, we might restart 
the executor a few
-      // times on the same machine.
+      // Wait for it to exit; executor may exit with code 0 (when driver 
instructs it to shutdown)
+      // or with nonzero exit code
       val exitCode = process.waitFor()
-      state = ExecutorState.FAILED
+      state = ExecutorState.EXITED
       val message = "Command exited with code " + exitCode
       worker ! ExecutorStateChanged(appId, execId, state, Some(message), 
Some(exitCode))
     } catch {

Reply via email to