Repository: spark Updated Branches: refs/heads/branch-0.9 748f002b3 -> c6630d363
SPARK-1032. If Yarn app fails before registering, app master stays aroun... ...d long after This reopens https://github.com/apache/incubator-spark/pull/648 against the new repo. Author: Sandy Ryza <[email protected]> Closes #28 from sryza/sandy-spark-1032 and squashes the following commits: 5953f50 [Sandy Ryza] SPARK-1032. If Yarn app fails before registering, app master stays around long after Conflicts: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6630d36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6630d36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6630d36 Branch: refs/heads/branch-0.9 Commit: c6630d363a73184c8dcca9f2c0c6fc3f5c8e47bf Parents: 748f002 Author: Sandy Ryza <[email protected]> Authored: Fri Feb 28 09:40:47 2014 -0600 Committer: Thomas Graves <[email protected]> Committed: Thu Mar 20 16:50:44 2014 -0500 ---------------------------------------------------------------------- .../spark/deploy/yarn/ApplicationMaster.scala | 34 +++++++++++++------- .../spark/deploy/yarn/ApplicationMaster.scala | 22 +++++++++---- 2 files changed, 38 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c6630d36/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 7aa1894..e045b9f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -66,6 +66,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) + private var registered = false + private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( SparkContext.SPARK_UNKNOWN_USER) @@ -114,7 +116,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, waitForSparkContextInitialized() // Do this after spark master is up and SparkContext is created so that we can register UI Url - val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + synchronized { + if (!isFinished) { + registerApplicationMaster() + registered = true + } + } // Allocate all containers allocateWorkers() @@ -212,7 +219,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, var count = 0 val waitTime = 10000L val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) - while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { + while (ApplicationMaster.sparkContextRef.get() == null && count < numTries + && !isFinished) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 ApplicationMaster.sparkContextRef.wait(waitTime) @@ -345,17 +353,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, return } isFinished = true + + logInfo("finishApplicationMaster with " + status) + if (registered) { + val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) + .asInstanceOf[FinishApplicationMasterRequest] + finishReq.setAppAttemptId(appAttemptId) + finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) + // Set tracking url to empty since we don't have a history server. + finishReq.setTrackingUrl("") + resourceManager.finishApplicationMaster(finishReq) + } } - - logInfo("finishApplicationMaster with " + status) - val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) - .asInstanceOf[FinishApplicationMasterRequest] - finishReq.setAppAttemptId(appAttemptId) - finishReq.setFinishApplicationStatus(status) - finishReq.setDiagnostics(diagnostics) - // Set tracking url to empty since we don't have a history server. - finishReq.setTrackingUrl("") - resourceManager.finishApplicationMaster(finishReq) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/c6630d36/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 45a7f38..b312a42 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -68,6 +68,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numWorkers * 2, 3)) + private var registered = false + private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse( SparkContext.SPARK_UNKNOWN_USER) @@ -103,7 +105,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. - val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + synchronized { + if (!isFinished) { + registerApplicationMaster() + registered = true + } + } // Allocate all containers allocateWorkers() @@ -184,7 +191,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, var numTries = 0 val waitTime = 10000L val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) - while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) { + while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries + && !isFinished) { logInfo("Waiting for Spark context initialization ... " + numTries) numTries = numTries + 1 ApplicationMaster.sparkContextRef.wait(waitTime) @@ -317,11 +325,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, return } isFinished = true - } - logInfo("finishApplicationMaster with " + status) - // Set tracking URL to empty since we don't have a history server. - amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) + logInfo("finishApplicationMaster with " + status) + if (registered) { + // Set tracking URL to empty since we don't have a history server. + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) + } + } } /**
