Repository: spark
Updated Branches:
  refs/heads/branch-1.1 7a236dcf8 -> e884805ce


SPARK-2425 Don't kill a still-running Application because of some misbehaving 
Executors

Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master 
from removing an Application with RUNNING Executors.

Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits 
over the entire lifetime of the Application, allow that many since any Executor 
successfully began running the Application; 2) Don't remove the Application 
while Master still thinks that there are RUNNING Executors.

This should be fine as long as the ApplicationInfo doesn't believe any 
Executors are forever RUNNING when they are not.  I think that any non-RUNNING 
Executors will eventually no longer be RUNNING in Master's accounting, but 
another set of eyes should confirm that.  This PR also doesn't try to detect 
which nodes have gone rogue or to kill off bad Workers, so repeatedly failing 
Executors will continue to fail and fill up log files with failure reports as 
long as the Application keeps running.

Author: Mark Hamstra <[email protected]>

Closes #1360 from markhamstra/SPARK-2425 and squashes the following commits:

f099c0b [Mark Hamstra] Reuse appInfo
b2b7b25 [Mark Hamstra] Moved 'Application failed' logging
bdd0928 [Mark Hamstra] switched to string interpolation
1dd591b [Mark Hamstra] SPARK-2425 introduce LOADING -> RUNNING ApplicationState 
transition and prevent Master from removing Application with RUNNING Executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e884805c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e884805c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e884805c

Branch: refs/heads/branch-1.1
Commit: e884805ce8b42b60534b616cb82f7bb6b8d7f907
Parents: 7a236dc
Author: Mark Hamstra <[email protected]>
Authored: Mon Sep 8 20:51:56 2014 -0700
Committer: Andrew Or <[email protected]>
Committed: Tue Sep 9 11:28:30 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/master/ApplicationInfo.scala   |  4 ++-
 .../org/apache/spark/deploy/master/Master.scala | 26 ++++++++++++--------
 .../spark/deploy/worker/ExecutorRunner.scala    |  2 ++
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 4 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e884805c/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index d367442..c3ca43f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
 
   def retryCount = _retryCount
 
-  def incrementRetryCount = {
+  def incrementRetryCount() = {
     _retryCount += 1
     _retryCount
   }
 
+  def resetRetryCount() = _retryCount = 0
+
   def markFinished(endState: ApplicationState.Value) {
     state = endState
     endTime = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/spark/blob/e884805c/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5017273..8d99ed4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -295,28 +295,34 @@ private[spark] class Master(
       val execOption = idToApp.get(appId).flatMap(app => 
app.executors.get(execId))
       execOption match {
         case Some(exec) => {
+          val appInfo = idToApp(appId)
           exec.state = state
+          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
           exec.application.driver ! ExecutorUpdated(execId, state, message, 
exitStatus)
           if (ExecutorState.isFinished(state)) {
-            val appInfo = idToApp(appId)
             // Remove this executor from the worker and app
-            logInfo("Removing executor " + exec.fullId + " because it is " + 
state)
+            logInfo(s"Removing executor ${exec.fullId} because it is $state")
             appInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
 
-            val normalExit = exitStatus.exists(_ == 0)
+            val normalExit = exitStatus == Some(0)
             // Only retry certain number of times so we don't go into an 
infinite loop.
-            if (!normalExit && appInfo.incrementRetryCount < 
ApplicationState.MAX_NUM_RETRY) {
-              schedule()
-            } else if (!normalExit) {
-              logError("Application %s with ID %s failed %d times, removing 
it".format(
-                appInfo.desc.name, appInfo.id, appInfo.retryCount))
-              removeApplication(appInfo, ApplicationState.FAILED)
+            if (!normalExit) {
+              if (appInfo.incrementRetryCount() < 
ApplicationState.MAX_NUM_RETRY) {
+                schedule()
+              } else {
+                val execs = appInfo.executors.values
+                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
+                  logError(s"Application ${appInfo.desc.name} with ID 
${appInfo.id} failed " +
+                    s"${appInfo.retryCount} times; removing it")
+                  removeApplication(appInfo, ApplicationState.FAILED)
+                }
+              }
             }
           }
         }
         case None =>
-          logWarning("Got status update for unknown executor " + appId + "/" + 
execId)
+          logWarning(s"Got status update for unknown executor $appId/$execId")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e884805c/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7be89f9..00a4367 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
       Files.write(header, stderr, Charsets.UTF_8)
       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
+      state = ExecutorState.RUNNING
+      worker ! ExecutorStateChanged(appId, execId, state, None, None)
       // Wait for it to exit; executor may exit with code 0 (when driver 
instructs it to shutdown)
       // or with nonzero exit code
       val exitCode = process.waitFor()

http://git-wip-us.apache.org/repos/asf/spark/blob/e884805c/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index da4fa2f..acc83d5 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -239,7 +239,7 @@ private[spark] class Worker(
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, 
execId, appDesc.name))
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, 
memory_,
-            self, workerId, host, sparkHome, workDir, akkaUrl, conf, 
ExecutorState.RUNNING)
+            self, workerId, host, sparkHome, workDir, akkaUrl, conf, 
ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to