Repository: spark
Updated Branches:
  refs/heads/master 5363ed715 -> e53534655


[SPARK-8297] [YARN] Scheduler backend is not notified in case node fails in YARN

This change adds code to notify the scheduler backend when a container dies in 
YARN.

Author: Mridul Muralidharan <[email protected]>
Author: Marcelo Vanzin <[email protected]>

Closes #7431 from vanzin/SPARK-8297 and squashes the following commits:

471e4a0 [Marcelo Vanzin] Fix unit test after merge.
d4adf4e [Marcelo Vanzin] Merge branch 'master' into SPARK-8297
3b262e8 [Marcelo Vanzin] Merge branch 'master' into SPARK-8297
537da6f [Marcelo Vanzin] Make an expected log less scary.
04dc112 [Marcelo Vanzin] Use driver <-> AM communication to send "remove 
executor" request.
8855b97 [Marcelo Vanzin] Merge remote-tracking branch 
'mridul/fix_yarn_scheduler_bug' into SPARK-8297
687790f [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of 
github.com:mridulm/spark into fix_yarn_scheduler_bug
e1b0067 [Mridul Muralidharan] Fix failing testcase, fix merge issue from our 
1.3 -> master
9218fcc [Mridul Muralidharan] Fix failing testcase
362d64a [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of 
github.com:mridulm/spark into fix_yarn_scheduler_bug
62ad0cc [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of 
github.com:mridulm/spark into fix_yarn_scheduler_bug
bbf8811 [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of 
github.com:mridulm/spark into fix_yarn_scheduler_bug
9ee1307 [Mridul Muralidharan] Fix SPARK-8297
a3a0f01 [Mridul Muralidharan] Fix SPARK-8297


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5353465
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5353465
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5353465

Branch: refs/heads/master
Commit: e53534655d6198e5b8a507010d26c7b4c4e7f1fd
Parents: 5363ed7
Author: Mridul Muralidharan <[email protected]>
Authored: Thu Jul 30 10:37:53 2015 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Jul 30 10:37:53 2015 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala |  2 +-
 .../cluster/YarnSchedulerBackend.scala          |  2 ++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 22 +++++++++-----
 .../spark/deploy/yarn/YarnAllocator.scala       | 32 ++++++++++++++++----
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  5 ++-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 29 ++++++++++++++++++
 6 files changed, 77 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e5353465/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 660702f..bd89160 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -241,7 +241,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           scheduler.executorLost(executorId, SlaveLost(reason))
           listenerBus.post(
             SparkListenerExecutorRemoved(System.currentTimeMillis(), 
executorId, reason))
-        case None => logError(s"Asked to remove non-existent executor 
$executorId")
+        case None => logInfo(s"Asked to remove non-existent executor 
$executorId")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e5353465/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 074282d..044f628 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -109,6 +109,8 @@ private[spark] abstract class YarnSchedulerBackend(
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)
 
+      case RemoveExecutor(executorId, reason) =>
+        removeExecutor(executorId, reason)
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e5353465/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 44acc73..1d67b3e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -229,7 +229,11 @@ private[spark] class ApplicationMaster(
     sparkContextRef.compareAndSet(sc, null)
   }
 
-  private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: 
SecurityManager) = {
+  private def registerAM(
+      _rpcEnv: RpcEnv,
+      driverRef: RpcEndpointRef,
+      uiAddress: String,
+      securityMgr: SecurityManager) = {
     val sc = sparkContextRef.get()
 
     val appId = client.getAttemptId().getApplicationId().toString()
@@ -246,6 +250,7 @@ private[spark] class ApplicationMaster(
         RpcAddress(_sparkConf.get("spark.driver.host"), 
_sparkConf.get("spark.driver.port").toInt),
         CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
     allocator = client.register(driverUrl,
+      driverRef,
       yarnConf,
       _sparkConf,
       if (sc != null) sc.preferredNodeLocationData else Map(),
@@ -262,17 +267,20 @@ private[spark] class ApplicationMaster(
    *
    * In cluster mode, the AM and the driver belong to same process
    * so the AMEndpoint need not monitor lifecycle of the driver.
+   *
+   * @return A reference to the driver's RPC endpoint.
    */
   private def runAMEndpoint(
       host: String,
       port: String,
-      isClusterMode: Boolean): Unit = {
+      isClusterMode: Boolean): RpcEndpointRef = {
     val driverEndpoint = rpcEnv.setupEndpointRef(
       SparkEnv.driverActorSystemName,
       RpcAddress(host, port.toInt),
       YarnSchedulerBackend.ENDPOINT_NAME)
     amEndpoint =
       rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, 
isClusterMode))
+    driverEndpoint
   }
 
   private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -290,11 +298,11 @@ private[spark] class ApplicationMaster(
         "Timed out waiting for SparkContext.")
     } else {
       rpcEnv = sc.env.rpcEnv
-      runAMEndpoint(
+      val driverRef = runAMEndpoint(
         sc.getConf.get("spark.driver.host"),
         sc.getConf.get("spark.driver.port"),
         isClusterMode = true)
-      registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
+      registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), 
securityMgr)
       userClassThread.join()
     }
   }
@@ -302,9 +310,9 @@ private[spark] class ApplicationMaster(
   private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
     val port = sparkConf.getInt("spark.yarn.am.port", 0)
     rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, 
sparkConf, securityMgr)
-    waitForSparkDriver()
+    val driverRef = waitForSparkDriver()
     addAmIpFilter()
-    registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), 
securityMgr)
+    registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", 
""), securityMgr)
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()
@@ -428,7 +436,7 @@ private[spark] class ApplicationMaster(
     }
   }
 
-  private def waitForSparkDriver(): Unit = {
+  private def waitForSparkDriver(): RpcEndpointRef = {
     logInfo("Waiting for Spark driver to be reachable.")
     var driverUp = false
     val hostport = args.userArgs(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/e5353465/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6c10339..59caa78 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,9 @@ import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 
 /**
  * YarnAllocator is charged with requesting containers from the YARN 
ResourceManager and deciding
@@ -52,6 +55,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
  */
 private[yarn] class YarnAllocator(
     driverUrl: String,
+    driverRef: RpcEndpointRef,
     conf: Configuration,
     sparkConf: SparkConf,
     amClient: AMRMClient[ContainerRequest],
@@ -88,6 +92,9 @@ private[yarn] class YarnAllocator(
   // Visible for testing.
   private[yarn] val executorIdToContainer = new HashMap[String, Container]
 
+  private var numUnexpectedContainerRelease = 0L
+  private val containerIdToExecutorId = new HashMap[ContainerId, String]
+
   // Executor memory in MB.
   protected val executorMemory = args.executorMemory
   // Additional memory overhead.
@@ -184,6 +191,7 @@ private[yarn] class YarnAllocator(
   def killExecutor(executorId: String): Unit = synchronized {
     if (executorIdToContainer.contains(executorId)) {
       val container = executorIdToContainer.remove(executorId).get
+      containerIdToExecutorId.remove(container.getId)
       internalReleaseContainer(container)
       numExecutorsRunning -= 1
     } else {
@@ -383,6 +391,7 @@ private[yarn] class YarnAllocator(
 
       logInfo("Launching container %s for on host %s".format(containerId, 
executorHostname))
       executorIdToContainer(executorId) = container
+      containerIdToExecutorId(container.getId) = executorId
 
       val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
         new HashSet[ContainerId])
@@ -413,12 +422,8 @@ private[yarn] class YarnAllocator(
   private[yarn] def processCompletedContainers(completedContainers: 
Seq[ContainerStatus]): Unit = {
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
-
-      if (releasedContainers.contains(containerId)) {
-        // Already marked the container for release, so remove it from
-        // `releasedContainers`.
-        releasedContainers.remove(containerId)
-      } else {
+      val alreadyReleased = releasedContainers.remove(containerId)
+      if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of 
allocating.
         numExecutorsRunning -= 1
@@ -460,6 +465,18 @@ private[yarn] class YarnAllocator(
 
         allocatedContainerToHostMap.remove(containerId)
       }
+
+      containerIdToExecutorId.remove(containerId).foreach { eid =>
+        executorIdToContainer.remove(eid)
+
+        if (!alreadyReleased) {
+          // The executor could have gone away (like no route to host, node 
failure, etc)
+          // Notify backend about the failure of the executor
+          numUnexpectedContainerRelease += 1
+          driverRef.send(RemoveExecutor(eid,
+            s"Yarn deallocated the executor $eid (container $containerId)"))
+        }
+      }
     }
   }
 
@@ -467,6 +484,9 @@ private[yarn] class YarnAllocator(
     releasedContainers.add(container.getId())
     amClient.releaseAssignedContainer(container.getId())
   }
+
+  private[yarn] def getNumUnexpectedContainerRelease = 
numUnexpectedContainerRelease
+
 }
 
 private object YarnAllocator {

http://git-wip-us.apache.org/repos/asf/spark/blob/e5353465/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 7f533ee..4999f9c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.spark.util.Utils
 
@@ -56,6 +57,7 @@ private[spark] class YarnRMClient(args: 
ApplicationMasterArguments) extends Logg
    */
   def register(
       driverUrl: String,
+      driverRef: RpcEndpointRef,
       conf: YarnConfiguration,
       sparkConf: SparkConf,
       preferredNodeLocations: Map[String, Set[SplitInfo]],
@@ -73,7 +75,8 @@ private[spark] class YarnRMClient(args: 
ApplicationMasterArguments) extends Logg
       amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
       registered = true
     }
-    new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), 
args, securityMgr)
+    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, 
getAttemptId(), args,
+      securityMgr)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e5353465/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 37a789f..58318bf 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -27,10 +27,14 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.mockito.Mockito._
+
 import org.apache.spark.{SecurityManager, SparkFunSuite}
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.YarnAllocator._
+import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.SplitInfo
 
 class MockResolver extends DNSToSwitchMapping {
@@ -90,6 +94,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter
       "--class", "SomeClass")
     new YarnAllocator(
       "not used",
+      mock(classOf[RpcEndpointRef]),
       conf,
       sparkConf,
       rmClient,
@@ -230,6 +235,30 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.getNumPendingAllocate should be (1)
   }
 
+  test("lost executor removed from backend") {
+    val handler = createAllocator(4)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (4)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2))
+
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())
+
+    val statuses = Seq(container1, container2).map { c =>
+      ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, 
"Failed", -1)
+    }
+    handler.updateResourceRequests()
+    handler.processCompletedContainers(statuses.toSeq)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (2)
+    handler.getNumExecutorsFailed should be (2)
+    handler.getNumUnexpectedContainerRelease should be (2)
+  }
+
   test("memory exceeded diagnostic regexes") {
     val diagnostics =
       "Container 
[pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to