Updated Branches:
  refs/heads/branch-0.8 d77c3371b -> d6e5eab2f

Merge pull request #189 from tgravescs/sparkYarnErrorHandling

Impove Spark on Yarn Error handling

Improve cli error handling and only allow a certain number of worker failures 
before failing the application.  This will help prevent users from doing 
foolish things and their jobs running forever.  For instance using 32 bit java 
but trying to allocate 8G containers. This loops forever without this change, 
now it errors out after a certain number of retries.  The number of tries is 
configurable.  Also increase the frequency we ping the RM to increase speed at 
which we get containers if they die. The Yarn MR app defaults to pinging the RM 
every 1 seconds, so the default of 5 seconds here is fine. But that is 
configurable as well in case people want to change it.

I do want to make sure there aren't any cases that calling stopExecutors in 
CoarseGrainedSchedulerBackend would cause problems?  I couldn't think of any 
and testing on standalone cluster as well as yarn.
(cherry picked from commit aa638ed9c140174a47df082ed5631ffe8e624ee6)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ee22be0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ee22be0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ee22be0e

Branch: refs/heads/branch-0.8
Commit: ee22be0e6c302fb2cdb24f83365c2b8a43a1baab
Parents: d77c337
Author: Matei Zaharia <[email protected]>
Authored: Tue Nov 19 16:05:44 2013 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Fri Dec 6 23:29:38 2013 -0800

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala |  1 +
 .../cluster/SimrSchedulerBackend.scala          |  1 -
 docs/running-on-yarn.md                         |  2 +
 .../spark/deploy/yarn/ApplicationMaster.scala   | 39 +++++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 32 ++++++++++------
 .../deploy/yarn/YarnAllocationHandler.scala     | 16 ++++++--
 6 files changed, 61 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee22be0e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a45bee5..d0ba5bf 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
ClusterScheduler, actorSystem: Ac
   }
 
   override def stop() {
+    stopExecutors()
     try {
       if (driverActor != null) {
         val future = driverActor.ask(StopDriver)(timeout)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee22be0e/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0ea35e2..e000531 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend(
     val conf = new Configuration()
     val fs = FileSystem.get(conf)
     fs.delete(new Path(driverFilePath), false)
-    super.stopExecutors()
     super.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee22be0e/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6fd1d0d..4056e9c 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -37,6 +37,8 @@ System Properties:
 * 'spark.yarn.applicationMaster.waitTries', property to set the number of 
times the ApplicationMaster waits for the the spark master and then also the 
number of tries it waits for the Spark Context to be intialized. Default is 10.
 * 'spark.yarn.submit.file.replication', the HDFS replication level for the 
files uploaded into HDFS for the application. These include things like the 
spark jar, the app jar, and any distributed cache files/archives.
 * 'spark.yarn.preserve.staging.files', set to true to preserve the staged 
files(spark jar, app jar, distributed cache files) at the end of the job rather 
then delete them.
+* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which 
the Spark application master heartbeats into the YARN ResourceManager. Default 
is 5 seconds. 
+* 'spark.yarn.max.worker.failures', the maximum number of worker failures 
before failing the application. Default is the number of workers requested 
times 2 with minimum of 3.
 
 # Launching Spark on YARN
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee22be0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4302ef4..2afc1d9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
   private val maxAppAttempts: Int = 
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
-
+  // default to numWorkers * 2, with minimum of 3 
+  private val maxNumWorkerFailures = 
System.getProperty("spark.yarn.max.worker.failures",
+    math.max(args.numWorkers * 2, 3).toString()).toInt
 
   def run() {
     // setup the directories so things go to yarn approved directories rather
@@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
 
         if (null != sparkContext) {
           uiAddress = sparkContext.ui.appUIAddress
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, 
resourceManager, appAttemptId, args, 
-                                               
sparkContext.preferredNodeLocationData) 
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, 
resourceManager, 
+            appAttemptId, args, sparkContext.preferredNodeLocationData) 
         } else {
           logWarning("Unable to retrieve sparkContext inspite of waiting for " 
+ count * waitTime + 
-                  ", numTries = " + numTries)
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, 
resourceManager, appAttemptId, args)
+            ", numTries = " + numTries)
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, 
resourceManager,
+            appAttemptId, args)
         }
       }
     } finally {
@@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
       while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
         // If user thread exists, then quit !
         userThread.isAlive) {
-
-          this.yarnAllocator.allocateContainers(math.max(args.numWorkers - 
yarnAllocator.getNumWorkersRunning, 0))
+          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+            finishApplicationMaster(FinalApplicationStatus.FAILED,
+              "max number of worker failures reached")
+          }
+          yarnAllocator.allocateContainers(math.max(args.numWorkers - 
yarnAllocator.getNumWorkersRunning, 0))
           ApplicationMaster.incrementAllocatorLoop(1)
           Thread.sleep(100)
       }
@@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
       // ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
       val timeoutInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-      // must be <= timeoutInterval/ 2.
-      // On other hand, also ensure that we are reasonably responsive without 
causing too many requests to RM.
-      // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-      val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 
10, 60000L))
+
+      // we want to be reasonably responsive without causing too many requests 
to RM.
+      val schedulerInterval = 
+        System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
+
+      // must be <= timeoutInterval / 2.
+      val interval = math.min(timeoutInterval / 2, schedulerInterval)
       launchReporterThread(interval)
     }
   }
 
-  // TODO: We might want to extend this to allocate more containers in case 
they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
     val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
+          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+            finishApplicationMaster(FinalApplicationStatus.FAILED, 
+              "max number of worker failures reached")
+          }
           val missingWorkerCount = args.numWorkers - 
yarnAllocator.getNumWorkersRunning
           if (missingWorkerCount > 0) {
             logInfo("Allocating " + missingWorkerCount + " containers to make 
up for (potentially ?) lost containers")
@@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
   }
   */
 
-  def finishApplicationMaster(status: FinalApplicationStatus) {
+  def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: 
String = "") {
 
     synchronized {
       if (isFinished) {
@@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration) e
       .asInstanceOf[FinishApplicationMasterRequest]
     finishReq.setAppAttemptId(appAttemptId)
     finishReq.setFinishApplicationStatus(status)
+    finishReq.setDiagnostics(diagnostics)
     // set tracking url to empty since we don't have a history server
     finishReq.setTrackingUrl("")
     resourceManager.finishApplicationMaster(finishReq)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee22be0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4e0e060..15b3480 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -60,6 +60,8 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
   val APP_FILE_PERMISSION: FsPermission = 
FsPermission.createImmutable(0644:Short) 
 
   def run() {
+    validateArgs()
+
     init(yarnConf)
     start()
     logClusterResourceDetails()
@@ -84,6 +86,23 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
     System.exit(0)
   }
 
+  def validateArgs() = {
+    Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR 
environment variable!",
+      (args.userJar == null) -> "Error: You must specify a user jar!",
+      (args.userClass == null) -> "Error: You must specify a user class!",
+      (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+        ("Error: AM memory size must be greater then: " + 
YarnAllocationHandler.MEMORY_OVERHEAD),
+      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+        ("Error: Worker memory size must be greater then: " + 
YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
+    .foreach { case(cond, errStr) => 
+      if (cond) {
+        logError(errStr)
+        args.printUsageAndExit(1)
+      }
+    }
+  }
+
   def getAppStagingDir(appId: ApplicationId): String = {
     SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
   }
@@ -97,7 +116,6 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
       ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", 
queueApplicationCount=" + queueInfo.getApplications.size +
       ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
   }
-
   
   def verifyClusterResources(app: GetNewApplicationResponse) = { 
     val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -215,11 +233,6 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
 
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
 
-    if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
-      logError("Error: You must set SPARK_JAR environment variable and specify 
a user jar!")
-      System.exit(1)
-    }
-
     Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> 
args.userJar, 
       Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
     .foreach { case(destName, _localPath) =>
@@ -334,7 +347,6 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
     JAVA_OPTS += " -Djava.io.tmpdir=" + 
       new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
 
-
     // Commenting it out for now - so that people can refer to the properties 
if required. Remove it once cpuset version is pushed out.
     // The context is, default gc for server class machines end up using all 
cores to do gc - hence if there are multiple containers in same
     // node, spark gc effects all other containers performance (which can also 
be other spark containers)
@@ -360,11 +372,6 @@ class Client(conf: Configuration, args: ClientArguments) 
extends YarnClientImpl
       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
     }
 
-    if (args.userClass == null) {
-      logError("Error: You must specify a user class!")
-      System.exit(1)
-    }
-
     val commands = List[String](javaCommand + 
       " -server " +
       JAVA_OPTS +
@@ -442,6 +449,7 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
 
     val args = new ClientArguments(argStrings)
+
     new Client(args).run
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ee22be0e/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 25da9aa..507a074 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: 
Configuration, val resourceM
   // Used to generate a unique id per worker
   private val workerIdCounter = new AtomicInteger()
   private val lastResponseId = new AtomicInteger()
+  private val numWorkersFailed = new AtomicInteger()
 
   def getNumWorkersRunning: Int = numWorkersRunning.intValue
 
+  def getNumWorkersFailed: Int = numWorkersFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
     container.getResource.getMemory >= (workerMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: 
Configuration, val resourceM
         else {
           // simply decrement count - next iteration of ReporterThread will 
take care of allocating !
           numWorkersRunning.decrementAndGet()
-          logInfo("Container completed ? nodeId: " + containerId + ", state " 
+ completedContainer.getState +
-            " httpaddress: " + completedContainer.getDiagnostics)
+          logInfo("Container completed not by us ? nodeId: " + containerId + 
", state " + completedContainer.getState +
+            " httpaddress: " + completedContainer.getDiagnostics + " exit 
status: " + completedContainer.getExitStatus())
+
+          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+          // there are some exit status' we shouldn't necessarily count 
against us, but for
+          // now I think its ok as none of the containers are expected to exit
+          if (completedContainer.getExitStatus() != 0) {
+            logInfo("Container marked as failed: " + containerId) 
+            numWorkersFailed.incrementAndGet()
+          }
         }
 
         allocatedHostToContainersMap.synchronized {
@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: 
Configuration, val resourceM
     val releasedContainerList = createReleasedContainerList()
     req.addAllReleases(releasedContainerList)
 
-
-
     if (numWorkers > 0) {
       logInfo("Allocating " + numWorkers + " worker containers with " + 
(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
     }

Reply via email to