Repository: spark
Updated Branches:
  refs/heads/master a76bde9da -> af3bc59d1


[SPARK-8167] Make tasks that fail from YARN preemption not fail job

The architecture is that, in YARN mode, if the driver detects that an executor 
has disconnected, it asks the ApplicationMaster why the executor died. If the 
ApplicationMaster is aware that the executor died because of preemption, all 
tasks associated with that executor are not marked as failed. The executor
is still removed from the driver's list of available executors, however.

There's a few open questions:
1. Should standalone mode have a similar "get executor loss reason" as well? I 
localized this change as much as possible to affect only YARN, but there could 
be a valid case to differentiate executor losses in standalone mode as well.
2. I make a pretty strong assumption in YarnAllocator that 
getExecutorLossReason(executorId) will only be called once per executor id; I 
do this so that I can remove the metadata from the in-memory map to avoid 
object accumulation. It's not clear if I'm being overly zealous to save space, 
however.

cc vanzin specifically for review because it collided with some earlier YARN 
scheduling work.
cc JoshRosen because it's similar to output commit coordination we did in the 
past
cc andrewor14 for our discussion on how to get executor exit codes and loss 
reasons

Author: mcheah <mch...@palantir.com>

Closes #8007 from mccheah/feature/preemption-handling.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af3bc59d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af3bc59d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af3bc59d

Branch: refs/heads/master
Commit: af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0
Parents: a76bde9
Author: mcheah <mch...@palantir.com>
Authored: Thu Sep 10 11:58:54 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Sep 10 11:58:54 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala  | 18 +++-
 .../spark/scheduler/ExecutorLossReason.scala    | 14 +--
 .../scala/org/apache/spark/scheduler/Pool.scala |  4 +-
 .../apache/spark/scheduler/Schedulable.scala    |  2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  9 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 21 +++--
 .../cluster/CoarseGrainedClusterMessage.scala   |  8 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 24 +++--
 .../cluster/SparkDeploySchedulerBackend.scala   |  6 +-
 .../cluster/YarnSchedulerBackend.scala          | 77 ++++++++++++++--
 .../mesos/CoarseMesosSchedulerBackend.scala     |  4 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  2 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  9 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 33 +++++--
 .../apache/spark/util/JsonProtocolSuite.scala   | 10 ++-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  7 ++
 .../spark/deploy/yarn/YarnAllocator.scala       | 92 +++++++++++++++-----
 17 files changed, 261 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 934d00d..2ae878b 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -48,6 +48,8 @@ case object Success extends TaskEndReason
 sealed trait TaskFailedReason extends TaskEndReason {
   /** Error message displayed in the web UI. */
   def toErrorString: String
+
+  def shouldEventuallyFailJob: Boolean = true
 }
 
 /**
@@ -194,6 +196,12 @@ case object TaskKilled extends TaskFailedReason {
 case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) 
extends TaskFailedReason {
   override def toErrorString: String = s"TaskCommitDenied (Driver denied task 
commit)" +
     s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+  /**
+   * If a task failed because its attempt to commit was denied, do not count 
this failure
+   * towards failing the stage. This is intended to prevent spurious stage 
failures in cases
+   * where many speculative tasks are launched and denied to commit.
+   */
+  override def shouldEventuallyFailJob: Boolean = false
 }
 
 /**
@@ -202,8 +210,14 @@ case class TaskCommitDenied(jobID: Int, partitionID: Int, 
attemptID: Int) extend
  * the task crashed the JVM.
  */
 @DeveloperApi
-case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
-  override def toErrorString: String = s"ExecutorLostFailure (executor 
${execId} lost)"
+case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+  extends TaskFailedReason {
+  override def toErrorString: String = {
+    val exitBehavior = if (isNormalExit) "normally" else "abnormally"
+    s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+  }
+
+  override def shouldEventuallyFailJob: Boolean = !isNormalExit
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2bc43a9..0a98c69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -23,16 +23,20 @@ import org.apache.spark.executor.ExecutorExitCode
  * Represents an explanation for a executor or whole slave failing or exiting.
  */
 private[spark]
-class ExecutorLossReason(val message: String) {
+class ExecutorLossReason(val message: String) extends Serializable {
   override def toString: String = message
 }
 
 private[spark]
-case class ExecutorExited(val exitCode: Int)
-  extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+  extends ExecutorLossReason(reason)
+
+private[spark] object ExecutorExited {
+  def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
+    ExecutorExited(exitCode, isNormalExit, 
ExecutorExitCode.explainExitCode(exitCode))
+  }
 }
 
 private[spark]
 case class SlaveLost(_message: String = "Slave lost")
-  extends ExecutorLossReason(_message) {
-}
+  extends ExecutorLossReason(_message)

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 5821afe..551e39a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -83,8 +83,8 @@ private[spark] class Pool(
     null
   }
 
-  override def executorLost(executorId: String, host: String) {
-    schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
+  override def executorLost(executorId: String, host: String, reason: 
ExecutorLossReason) {
+    schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
   }
 
   override def checkSpeculatableTasks(): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index a87ef03..ab00bc8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -42,7 +42,7 @@ private[spark] trait Schedulable {
   def addSchedulable(schedulable: Schedulable): Unit
   def removeSchedulable(schedulable: Schedulable): Unit
   def getSchedulableByName(name: String): Schedulable
-  def executorLost(executorId: String, host: String): Unit
+  def executorLost(executorId: String, host: String, reason: 
ExecutorLossReason): Unit
   def checkSpeculatableTasks(): Boolean
   def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1705e7f..1c7bfe8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -332,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
           // We lost this entire executor, so remember that it's gone
           val execId = taskIdToExecutorId(tid)
           if (activeExecutorIds.contains(execId)) {
-            removeExecutor(execId)
+            removeExecutor(execId,
+              SlaveLost(s"Task $tid was lost, so marking the executor as lost 
as well."))
             failedExecutor = Some(execId)
           }
         }
@@ -464,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
       if (activeExecutorIds.contains(executorId)) {
         val hostPort = executorIdToHost(executorId)
         logError("Lost executor %s on %s: %s".format(executorId, hostPort, 
reason))
-        removeExecutor(executorId)
+        removeExecutor(executorId, reason)
         failedExecutor = Some(executorId)
       } else {
          // We may get multiple executorLost() calls with different loss 
reasons. For example, one
@@ -482,7 +483,7 @@ private[spark] class TaskSchedulerImpl(
   }
 
   /** Remove an executor from all our data structures and mark it as lost */
-  private def removeExecutor(executorId: String) {
+  private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
     activeExecutorIds -= executorId
     val host = executorIdToHost(executorId)
     val execs = executorsByHost.getOrElse(host, new HashSet)
@@ -497,7 +498,7 @@ private[spark] class TaskSchedulerImpl(
       }
     }
     executorIdToHost -= executorId
-    rootPool.executorLost(executorId, host)
+    rootPool.executorLost(executorId, host, reason)
   }
 
   def executorAdded(execId: String, host: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 818b95d..62af903 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -709,6 +709,11 @@ private[spark] class TaskSetManager(
         }
         ef.exception
 
+      case e: ExecutorLostFailure if e.isNormalExit =>
+        logInfo(s"Task $tid failed because while it was being computed, its 
executor" +
+          s" exited normally. Not marking the task as failed.")
+        None
+
       case e: TaskFailedReason =>  // TaskResultLost, TaskKilled, and others
         logWarning(failureReason)
         None
@@ -722,10 +727,9 @@ private[spark] class TaskSetManager(
       put(info.executorId, clock.getTimeMillis())
     sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, 
taskMetrics)
     addPendingTask(index)
-    if (!isZombie && state != TaskState.KILLED && 
!reason.isInstanceOf[TaskCommitDenied]) {
-      // If a task failed because its attempt to commit was denied, do not 
count this failure
-      // towards failing the stage. This is intended to prevent spurious stage 
failures in cases
-      // where many speculative tasks are launched and denied to commit.
+    if (!isZombie && state != TaskState.KILLED
+        && reason.isInstanceOf[TaskFailedReason]
+        && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
       assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
@@ -778,7 +782,7 @@ private[spark] class TaskSetManager(
   }
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue 
our tasks */
-  override def executorLost(execId: String, host: String) {
+  override def executorLost(execId: String, host: String, reason: 
ExecutorLossReason) {
     logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
 
     // Re-enqueue pending tasks for this host based on the status of the 
cluster. Note
@@ -809,9 +813,12 @@ private[spark] class TaskSetManager(
         }
       }
     }
-    // Also re-enqueue any tasks that were running on the node
     for ((tid, info) <- taskInfos if info.running && info.executorId == 
execId) {
-      handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
+      val isNormalExit: Boolean = reason match {
+        case exited: ExecutorExited => exited.isNormalExit
+        case _ => false
+      }
+      handleFailedTask(tid, TaskState.FAILED, 
ExecutorLostFailure(info.executorId, isNormalExit))
     }
     // recalculate valid locality levels and waits when executor is lost
     recomputeLocality()

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 06f5438..d947436 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.ExecutorLossReason
 import org.apache.spark.util.{SerializableBuffer, Utils}
 
 private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@@ -70,7 +71,8 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case object StopExecutors extends CoarseGrainedClusterMessage
 
-  case class RemoveExecutor(executorId: String, reason: String) extends 
CoarseGrainedClusterMessage
+  case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
+    extends CoarseGrainedClusterMessage
 
   case class SetupDriver(driver: RpcEndpointRef) extends 
CoarseGrainedClusterMessage
 
@@ -92,6 +94,10 @@ private[spark] object CoarseGrainedClusterMessages {
       hostToLocalTaskCount: Map[String, Int])
     extends CoarseGrainedClusterMessage
 
+  // Check if an executor was force-killed but for a normal reason.
+  // This could be the case if the executor is preempted, for instance.
+  case class GetExecutorLossReason(executorId: String) extends 
CoarseGrainedClusterMessage
+
   case class KillExecutors(executorIds: Seq[String]) extends 
CoarseGrainedClusterMessage
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5730a87..18771f7 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, 
SparkException, TaskState}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
 import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, 
Utils}
 
 /**
@@ -82,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
     override protected def log = CoarseGrainedSchedulerBackend.this.log
 
-    private val addressToExecutorId = new HashMap[RpcAddress, String]
+    protected val addressToExecutorId = new HashMap[RpcAddress, String]
 
     private val reviveThread =
       
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
@@ -128,6 +129,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+
       case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) 
=>
         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
         if (executorDataMap.contains(executorId)) {
@@ -185,8 +187,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-      addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_,
-        "remote Rpc client disassociated"))
+      addressToExecutorId
+        .get(remoteAddress)
+        .foreach(removeExecutor(_, SlaveLost("remote Rpc client 
disassociated")))
     }
 
     // Make fake resource offers on just one executor
@@ -227,7 +230,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // Remove a disconnected slave from the cluster
-    def removeExecutor(executorId: String, reason: String): Unit = {
+    def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = 
{
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
@@ -239,9 +242,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
-          scheduler.executorLost(executorId, SlaveLost(reason))
+          scheduler.executorLost(executorId, reason)
           listenerBus.post(
-            SparkListenerExecutorRemoved(System.currentTimeMillis(), 
executorId, reason))
+            SparkListenerExecutorRemoved(System.currentTimeMillis(), 
executorId, reason.toString))
         case None => logInfo(s"Asked to remove non-existent executor 
$executorId")
       }
     }
@@ -263,8 +266,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // TODO (prashant) send conf instead of properties
-    driverEndpoint = rpcEnv.setupEndpoint(
-      CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, 
properties))
+    driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, 
createDriverEndpoint(properties))
+  }
+
+  protected def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
+    new DriverEndpoint(rpcEnv, properties)
   }
 
   def stopExecutors() {
@@ -304,7 +310,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   // Called by subclasses when notified of a lost worker
-  def removeExecutor(executorId: String, reason: String) {
+  def removeExecutor(executorId: String, reason: ExecutorLossReason) {
     try {
       driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bbe51b4..27491ec 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, 
SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
 private[spark] class SparkDeploySchedulerBackend(
@@ -135,11 +135,11 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorRemoved(fullId: String, message: String, exitStatus: 
Option[Int]) {
     val reason: ExecutorLossReason = exitStatus match {
-      case Some(code) => ExecutorExited(code)
+      case Some(code) => ExecutorExited(code, isNormalExit = true, message)
       case None => SlaveLost(message)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
-    removeExecutor(fullId.split("/")(1), reason.toString)
+    removeExecutor(fullId.split("/")(1), reason)
   }
 
   override def sufficientResourcesRegistered(): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 044f628..6a4b536 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.scheduler.cluster
 
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{Future, ExecutionContext}
 
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler._
 import org.apache.spark.ui.JettyUtils
 import org.apache.spark.util.{ThreadUtils, RpcUtils}
 
@@ -43,8 +44,10 @@ private[spark] abstract class YarnSchedulerBackend(
 
   protected var totalExpectedExecutors = 0
 
-  private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
-    YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
+  private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+
+  private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
+    YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
 
   private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
 
@@ -53,7 +56,7 @@ private[spark] abstract class YarnSchedulerBackend(
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    yarnSchedulerEndpoint.askWithRetry[Boolean](
+    yarnSchedulerEndpointRef.askWithRetry[Boolean](
       RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
   }
 
@@ -61,7 +64,7 @@ private[spark] abstract class YarnSchedulerBackend(
    * Request that the ApplicationMaster kill the specified executors.
    */
   override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+    yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
@@ -90,6 +93,41 @@ private[spark] abstract class YarnSchedulerBackend(
     }
   }
 
+  override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
+    new YarnDriverEndpoint(rpcEnv, properties)
+  }
+
+  /**
+   * Override the DriverEndpoint to add extra logic for the case when an 
executor is disconnected.
+   * This endpoint communicates with the executors and queries the AM for an 
executor's exit
+   * status when the executor is disconnected.
+   */
+  private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
+      extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+    /**
+     * When onDisconnected is received at the driver endpoint, the superclass 
DriverEndpoint
+     * handles it by assuming the Executor was lost for a bad reason and 
removes the executor
+     * immediately.
+     *
+     * In YARN's case however it is crucial to talk to the application master 
and ask why the
+     * executor had exited. In particular, the executor may have exited due to 
the executor
+     * having been preempted. If the executor "exited normally" according to 
the application
+     * master then we pass that information down to the TaskSetManager to 
inform the
+     * TaskSetManager that tasks on that lost executor should not count 
towards a job failure.
+     *
+     * TODO there's a race condition where while we are querying the 
ApplicationMaster for
+     * the executor loss reason, there is the potential that tasks will be 
scheduled on
+     * the executor that failed. We should fix this by having this 
onDisconnected event
+     * also "blacklist" executors so that tasks are not assigned to them.
+     */
+    override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+      addressToExecutorId.get(rpcAddress).foreach { executorId =>
+        yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, 
rpcAddress)
+      }
+    }
+  }
+
   /**
    * An [[RpcEndpoint]] that communicates with the ApplicationMaster.
    */
@@ -101,6 +139,33 @@ private[spark] abstract class YarnSchedulerBackend(
       
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
     implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
 
+    private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
+        executorId: String,
+        executorRpcAddress: RpcAddress): Unit = {
+      amEndpoint match {
+        case Some(am) =>
+          val lossReasonRequest = GetExecutorLossReason(executorId)
+          val future = am.ask[ExecutorLossReason](lossReasonRequest, 
askTimeout)
+          future onSuccess {
+            case reason: ExecutorLossReason => {
+              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, 
reason))
+            }
+          }
+          future onFailure {
+            case NonFatal(e) => {
+              logWarning(s"Attempted to get executor loss reason" +
+                s" for executor id ${executorId} at RPC address 
${executorRpcAddress}," +
+                s" but got no response. Marking as slave lost.", e)
+              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, 
SlaveLost()))
+            }
+            case t => throw t
+          }
+        case None =>
+          logWarning("Attempted to check for an executor loss reason" +
+            " before the AM has registered!")
+      }
+    }
+
     override def receive: PartialFunction[Any, Unit] = {
       case RegisterClusterManager(am) =>
         logInfo(s"ApplicationMaster registered as $am")
@@ -113,6 +178,7 @@ private[spark] abstract class YarnSchedulerBackend(
         removeExecutor(executorId, reason)
     }
 
+
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
       case r: RequestExecutors =>
         amEndpoint match {
@@ -143,7 +209,6 @@ private[spark] abstract class YarnSchedulerBackend(
             logWarning("Attempted to kill executors before the AM has 
registered!")
             context.reply(false)
         }
-
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 452c32d..65df887 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, 
SparkEnv, SparkException
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcAddress
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
@@ -364,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         if (slaveIdToTaskId.containsKey(slaveId)) {
           val taskId: Int = slaveIdToTaskId.get(slaveId)
           taskIdToSlaveId.remove(taskId)
-          removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+          removeExecutor(sparkExecutorId(slaveId, taskId.toString), 
SlaveLost(reason))
         }
         // TODO: This assumes one Spark executor per Mesos slave,
         // which may no longer be true after SPARK-5095

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 2e42405..18da6d2 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -390,7 +390,7 @@ private[spark] class MesosSchedulerBackend(
                             slaveId: SlaveID, status: Int) {
     logInfo("Executor lost: %s, marking slave %s as 
lost".format(executorId.getValue,
                                                                  
slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status))
+    recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
   }
 
   override def killTask(taskId: Long, executorId: String, interruptThread: 
Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index cbc94fd..24f7874 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,8 +362,9 @@ private[spark] object JsonProtocol {
         ("Stack Trace" -> stackTrace) ~
         ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
         ("Metrics" -> metrics)
-      case ExecutorLostFailure(executorId) =>
-        ("Executor ID" -> executorId)
+      case ExecutorLostFailure(executorId, isNormalExit) =>
+        ("Executor ID" -> executorId) ~
+        ("Normal Exit" -> isNormalExit)
       case _ => Utils.emptyJson
     }
     ("Reason" -> reason) ~ json
@@ -794,8 +795,10 @@ private[spark] object JsonProtocol {
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled
       case `executorLostFailure` =>
+        val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
+          map(_.extract[Boolean])
         val executorId = Utils.jsonOption(json \ "Executor 
ID").map(_.extract[String])
-        ExecutorLostFailure(executorId.getOrElse("Unknown"))
+        ExecutorLostFailure(executorId.getOrElse("Unknown"), 
isNormalExit.getOrElse(false))
       case `unknownReason` => UnknownReason
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index edbdb48..f0eadf2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -334,7 +334,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
     // Now mark host2 as dead
     sched.removeExecutor("exec2")
-    manager.executorLost("exec2", "host2")
+    manager.executorLost("exec2", "host2", SlaveLost())
 
     // nothing should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
@@ -504,13 +504,36 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
       Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
     // test if the valid locality is recomputed when the executor is lost
     sched.removeExecutor("execC")
-    manager.executorLost("execC", "host2")
+    manager.executorLost("execC", "host2", SlaveLost())
     assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, 
ANY)))
     sched.removeExecutor("execD")
-    manager.executorLost("execD", "host1")
+    manager.executorLost("execD", "host1", SlaveLost())
     assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
   }
 
+  test("Executors are added but exit normally while running tasks") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc)
+    val taskSet = FakeTask.createTaskSet(4,
+      Seq(TaskLocation("host1", "execA")),
+      Seq(TaskLocation("host1", "execB")),
+      Seq(TaskLocation("host2", "execC")),
+      Seq())
+    val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+    sched.addExecutor("execA", "host1")
+    manager.executorAdded()
+    sched.addExecutor("execC", "host2")
+    manager.executorAdded()
+    assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
+    sched.removeExecutor("execA")
+    manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal 
termination"))
+    assert(!sched.taskSetsFailed.contains(taskSet.id))
+    assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
+    sched.removeExecutor("execC")
+    manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal 
termination"))
+    assert(sched.taskSetsFailed.contains(taskSet.id))
+  }
+
   test("test RACK_LOCAL tasks") {
     // Assign host1 to rack1
     FakeRackUtil.assignHostToRack("host1", "rack1")
@@ -721,8 +744,8 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
     sched.removeExecutor("execA")
     sched.removeExecutor("execB.2")
-    manager.executorLost("execA", "host1")
-    manager.executorLost("execB.2", "host2")
+    manager.executorLost("execA", "host1", SlaveLost())
+    manager.executorLost("execB.2", "host2", SlaveLost())
     clock.advance(LOCALITY_WAIT_MS * 4)
     sched.addExecutor("execC", "host3")
     manager.executorAdded()

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 343a413..47e548e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,7 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testTaskEndReason(exceptionFailure)
     testTaskEndReason(TaskResultLost)
     testTaskEndReason(TaskKilled)
-    testTaskEndReason(ExecutorLostFailure("100"))
+    testTaskEndReason(ExecutorLostFailure("100", true))
     testTaskEndReason(UnknownReason)
 
     // BlockId
@@ -295,10 +295,10 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("ExecutorLostFailure backward compatibility") {
     // ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" 
property.
-    val executorLostFailure = ExecutorLostFailure("100")
+    val executorLostFailure = ExecutorLostFailure("100", true)
     val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
       .removeField({ _._1 == "Executor ID" })
-    val expectedExecutorLostFailure = ExecutorLostFailure("Unknown")
+    val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
     assert(expectedExecutorLostFailure === 
JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
@@ -577,8 +577,10 @@ class JsonProtocolSuite extends SparkFunSuite {
         assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
       case (TaskResultLost, TaskResultLost) =>
       case (TaskKilled, TaskKilled) =>
-      case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
+      case (ExecutorLostFailure(execId1, isNormalExit1),
+          ExecutorLostFailure(execId2, isNormalExit2)) =>
         assert(execId1 === execId2)
+        assert(isNormalExit1 === isNormalExit2)
       case (UnknownReason, UnknownReason) =>
       case _ => fail("Task end reasons don't match in types!")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 991b5ce..93621b4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -590,6 +590,13 @@ private[spark] class ApplicationMaster(
           case None => logWarning("Container allocator is not ready to kill 
executors yet.")
         }
         context.reply(true)
+
+      case GetExecutorLossReason(eid) =>
+        Option(allocator) match {
+          case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
+          case None => logWarning(s"Container allocator is not ready to find" +
+            s" executor loss reasons yet.")
+        }
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/af3bc59d/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5f897cb..fd88b8b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,8 +21,9 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
-import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConverters._
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 
@@ -36,8 +37,9 @@ import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.util.Utils
 
 /**
@@ -93,6 +95,11 @@ private[yarn] class YarnAllocator(
       sparkConf.getInt("spark.executor.instances", 
YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
     }
 
+  // Executor loss reason requests that are pending - maps from executor ID 
for inquiry to a
+  // list of requesters that should be responded to once we find out why the 
given executor
+  // was lost.
+  private val pendingLossReasonRequests = new HashMap[String, 
mutable.Buffer[RpcCallContext]]
+
   // Keep track of which container is running which executor to remove the 
executors later
   // Visible for testing.
   private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -235,9 +242,7 @@ private[yarn] class YarnAllocator(
     val completedContainers = allocateResponse.getCompletedContainersStatuses()
     if (completedContainers.size > 0) {
       logDebug("Completed %d containers".format(completedContainers.size))
-
       processCompletedContainers(completedContainers.asScala)
-
       logDebug("Finished processing %d completed containers. Current running 
executor count: %d."
         .format(completedContainers.size, numExecutorsRunning))
     }
@@ -429,7 +434,7 @@ private[yarn] class YarnAllocator(
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
       val alreadyReleased = releasedContainers.remove(containerId)
-      if (!alreadyReleased) {
+      val exitReason = if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of 
allocating.
         numExecutorsRunning -= 1
@@ -440,22 +445,42 @@ private[yarn] class YarnAllocator(
         // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
         // there are some exit status' we shouldn't necessarily count against 
us, but for
         // now I think its ok as none of the containers are expected to exit
-        if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) 
{
-          logInfo("Container preempted: " + containerId)
-        } else if (completedContainer.getExitStatus == -103) { // vmem limit 
exceeded
-          logWarning(memLimitExceededLogMessage(
-            completedContainer.getDiagnostics,
-            VMEM_EXCEEDED_PATTERN))
-        } else if (completedContainer.getExitStatus == -104) { // pmem limit 
exceeded
-          logWarning(memLimitExceededLogMessage(
-            completedContainer.getDiagnostics,
-            PMEM_EXCEEDED_PATTERN))
-        } else if (completedContainer.getExitStatus != 0) {
-          logInfo("Container marked as failed: " + containerId +
-            ". Exit status: " + completedContainer.getExitStatus +
-            ". Diagnostics: " + completedContainer.getDiagnostics)
-          numExecutorsFailed += 1
+        val exitStatus = completedContainer.getExitStatus
+        val (isNormalExit, containerExitReason) = exitStatus match {
+          case ContainerExitStatus.SUCCESS =>
+            (true, s"Executor for container $containerId exited normally.")
+          case ContainerExitStatus.PREEMPTED =>
+            // Preemption should count as a normal exit, since YARN preempts 
containers merely
+            // to do resource sharing, and tasks that fail due to preempted 
executors could
+            // just as easily finish on any other executor. See SPARK-8167.
+            (true, s"Container $containerId was preempted.")
+          // Should probably still count memory exceeded exit codes towards 
task failures
+          case VMEM_EXCEEDED_EXIT_CODE =>
+            (false, memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              VMEM_EXCEEDED_PATTERN))
+          case PMEM_EXCEEDED_EXIT_CODE =>
+            (false, memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              PMEM_EXCEEDED_PATTERN))
+          case unknown =>
+            numExecutorsFailed += 1
+            (false, "Container marked as failed: " + containerId +
+              ". Exit status: " + completedContainer.getExitStatus +
+              ". Diagnostics: " + completedContainer.getDiagnostics)
+
+        }
+        if (isNormalExit) {
+          logInfo(containerExitReason)
+        } else {
+          logWarning(containerExitReason)
         }
+        ExecutorExited(0, isNormalExit, containerExitReason)
+      } else {
+        // If we have already released this container, then it must mean
+        // that the driver has explicitly requested it to be killed
+        ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+          s"Container $containerId exited from explicit termination request.")
       }
 
       if (allocatedContainerToHostMap.contains(containerId)) {
@@ -474,18 +499,35 @@ private[yarn] class YarnAllocator(
 
       containerIdToExecutorId.remove(containerId).foreach { eid =>
         executorIdToContainer.remove(eid)
-
+        pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
+          // Notify application of executor loss reasons so it can decide 
whether it should abort
+          pendingRequests.foreach(_.reply(exitReason))
+        }
         if (!alreadyReleased) {
           // The executor could have gone away (like no route to host, node 
failure, etc)
           // Notify backend about the failure of the executor
           numUnexpectedContainerRelease += 1
-          driverRef.send(RemoveExecutor(eid,
-            s"Yarn deallocated the executor $eid (container $containerId)"))
+          driverRef.send(RemoveExecutor(eid, exitReason))
         }
       }
     }
   }
 
+  /**
+   * Register that some RpcCallContext has asked the AM why the executor was 
lost. Note that
+   * we can only find the loss reason to send back in the next call to 
allocateResources().
+   */
+  private[yarn] def enqueueGetLossReasonRequest(
+      eid: String,
+      context: RpcCallContext): Unit = synchronized {
+    if (executorIdToContainer.contains(eid)) {
+      pendingLossReasonRequests
+        .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+    } else {
+      logWarning(s"Tried to get the loss reason for non-existent executor 
$eid")
+    }
+  }
+
   private def internalReleaseContainer(container: Container): Unit = {
     releasedContainers.add(container.getId())
     amClient.releaseAssignedContainer(container.getId())
@@ -501,6 +543,8 @@ private object YarnAllocator {
     Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
   val VMEM_EXCEEDED_PATTERN =
     Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+  val VMEM_EXCEEDED_EXIT_CODE = -103
+  val PMEM_EXCEEDED_EXIT_CODE = -104
 
   def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): 
String = {
     val matcher = pattern.matcher(diagnostics)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to