Repository: spark
Updated Branches:
  refs/heads/branch-2.0 794d09969 -> ab4303800


[SPARK-16095][YARN] Yarn cluster mode should report correct state to 
SparkLauncher

## What changes were proposed in this pull request?
Yarn cluster mode should return correct state for SparkLauncher

## How was this patch tested?
unit test

Author: peng.zhang <[email protected]>

Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state.

(cherry picked from commit bad0f7dbba2eda149ee4fc5810674d971d17874a)
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab430380
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab430380
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab430380

Branch: refs/heads/branch-2.0
Commit: ab4303800d04c12828dd2896add3e84b2545a25a
Parents: 794d099
Author: peng.zhang <[email protected]>
Authored: Fri Jul 1 15:51:21 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Jul 1 15:51:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   |  9 ++++-
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 37 ++++++++++++--------
 2 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab430380/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9bb3695..01aa12a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1053,7 +1053,14 @@ private[spark] class Client(
           case YarnApplicationState.RUNNING =>
             reportLauncherState(SparkAppHandle.State.RUNNING)
           case YarnApplicationState.FINISHED =>
-            reportLauncherState(SparkAppHandle.State.FINISHED)
+            report.getFinalApplicationStatus match {
+              case FinalApplicationStatus.FAILED =>
+                reportLauncherState(SparkAppHandle.State.FAILED)
+              case FinalApplicationStatus.KILLED =>
+                reportLauncherState(SparkAppHandle.State.KILLED)
+              case _ =>
+                reportLauncherState(SparkAppHandle.State.FINISHED)
+            }
           case YarnApplicationState.FAILED =>
             reportLauncherState(SparkAppHandle.State.FAILED)
           case YarnApplicationState.KILLED =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ab430380/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 6b20dea..9085fca 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     finalState should be (SparkAppHandle.State.FAILED)
   }
 
+  test("run Spark in yarn-cluster mode failure after sc initialized") {
+    val finalState = runSpark(false, 
mainClassName(YarnClusterDriverWithFailure.getClass))
+    finalState should be (SparkAppHandle.State.FAILED)
+  }
+
   test("run Python application in yarn-client mode") {
     testPySpark(true)
   }
@@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends 
SparkListener {
   }
 }
 
+private object YarnClusterDriverWithFailure extends Logging with Matchers {
+  def main(args: Array[String]): Unit = {
+    val sc = new SparkContext(new SparkConf()
+      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
+      .setAppName("yarn test with failure"))
+
+    throw new Exception("exception after sc initialized")
+  }
+}
+
 private object YarnClusterDriver extends Logging with Matchers {
 
   val WAIT_TIMEOUT_MILLIS = 10000
@@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with 
Matchers {
       sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
       data should be (Set(1, 2, 3, 4))
       result = "success"
+
+      // Verify that the config archive is correctly placed in the classpath 
of all containers.
+      val confFile = "/" + Client.SPARK_CONF_FILE
+      assert(getClass().getResource(confFile) != null)
+      val configFromExecutors = sc.parallelize(1 to 4, 4)
+        .map { _ => 
Option(getClass().getResource(confFile)).map(_.toString).orNull }
+        .collect()
+      assert(configFromExecutors.find(_ == null) === None)
     } finally {
       Files.write(result, status, StandardCharsets.UTF_8)
       sc.stop()
     }
 
-    // Verify that the config archive is correctly placed in the classpath of 
all containers.
-    val confFile = "/" + Client.SPARK_CONF_FILE
-    assert(getClass().getResource(confFile) != null)
-    val configFromExecutors = sc.parallelize(1 to 4, 4)
-      .map { _ => 
Option(getClass().getResource(confFile)).map(_.toString).orNull }
-      .collect()
-    assert(configFromExecutors.find(_ == null) === None)
-
     // verify log urls are present
     val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
     assert(listeners.size === 1)
@@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with 
Matchers {
 }
 
 private object YarnClasspathTest extends Logging {
-
-  var exitCode = 0
-
   def error(m: String, ex: Throwable = null): Unit = {
     logError(m, ex)
     // scalastyle:off println
@@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging {
     } finally {
       sc.stop()
     }
-    System.exit(exitCode)
   }
 
   private def readResource(resultPath: String): Unit = {
@@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging {
     } catch {
       case t: Throwable =>
         error(s"loading test.resource to $resultPath", t)
-        // set the exit code if not yet set
-        exitCode = 2
     } finally {
       Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to