Repository: incubator-livy
Updated Branches:
  refs/heads/master e3f45a057 -> 551cc5309


[LIVY-452] Differentiate FAILED and KILLED states

Currently, it's not possible to distinguish between two states - 
SparkApp.State.KILLED and SparkApp.State.FAILED. In both cases the session 
state will be SessionState.Dead(). This patch adds new SessionState.Killed() 
which will be used when job was actually killed by user.

https://issues.apache.org/jira/browse/LIVY-452

- Corresponding unit test was adjusted
- Tested manually on YARN cluster with `livy.spark.deploy-mode=cluster`

Author: Alexey Romanenko <aromanenko....@gmail.com>

Closes #92 from aromanenko-dev/LIVY-452-SessionState-Killed.

Change-Id: Ia18808d7954e20653e152d8bf6748f43c2707b18


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/551cc530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/551cc530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/551cc530

Branch: refs/heads/master
Commit: 551cc53095f0a4b5382602ba0c296f8cf8932e44
Parents: e3f45a0
Author: Alexey Romanenko <aromanenko....@gmail.com>
Authored: Mon May 21 09:25:04 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Mon May 21 09:25:04 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/livy/sessions/SessionState.scala  |  4 ++++
 docs/rest-api.md                                       |  4 ++++
 .../apache/livy/test/framework/LivyRestClient.scala    |  5 +++++
 .../src/test/scala/org/apache/livy/test/BatchIT.scala  |  2 +-
 .../org/apache/livy/server/batch/BatchSession.scala    |  4 ++--
 .../livy/server/interactive/InteractiveSession.scala   |  3 ++-
 .../scala/org/apache/livy/utils/SparkProcApp.scala     | 13 +++++++++++--
 7 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/sessions/SessionState.scala 
b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
index e56cfbb..d731c9b 100644
--- a/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
+++ b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala
@@ -39,6 +39,7 @@ object SessionState {
     case "shutting_down" => ShuttingDown
     case "error" => Error()
     case "dead" => Dead()
+    case "killed" => Killed()
     case "success" => Success()
     case _ => throw new IllegalArgumentException(s"Illegal session state: $s")
   }
@@ -57,6 +58,9 @@ object SessionState {
 
   object ShuttingDown extends SessionState("shutting_down", false)
 
+  case class Killed(override val time: Long = System.nanoTime()) extends
+    FinishedSessionState("killed", false, time)
+
   case class Error(override val time: Long = System.nanoTime()) extends
     FinishedSessionState("error", true, time)
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/docs/rest-api.md
----------------------------------------------------------------------
diff --git a/docs/rest-api.md b/docs/rest-api.md
index 8c928a8..5949b93 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -644,6 +644,10 @@ A session represents an interactive shell.
     <td>Session has exited</td>
   </tr>
   <tr>
+    <td>killed</td>
+    <td>Session has been killed</td>
+  </tr>
+  <tr>
     <td>success</td>
     <td>Session is successfully stopped</td>
   </tr>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
----------------------------------------------------------------------
diff --git 
a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
 
b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
index 5300848..1087559 100644
--- 
a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
+++ 
b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
@@ -112,6 +112,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val 
livyEndpoint: String)
 
   class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
     def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
+    def verifySessionKilled(): Unit = verifySessionState(SessionState.Killed())
     def verifySessionRunning(): Unit = verifySessionState(SessionState.Running)
     def verifySessionSuccess(): Unit = 
verifySessionState(SessionState.Success())
   }
@@ -241,6 +242,10 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val 
livyEndpoint: String)
     def verifySessionIdle(): Unit = {
       verifySessionState(SessionState.Idle)
     }
+
+    def verifySessionKilled(): Unit = {
+      verifySessionState(SessionState.Killed())
+    }
   }
 
   def startBatch(

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala 
b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
index b02301b..7c433fb 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
@@ -105,7 +105,7 @@ class BatchIT extends BaseIntegrationTestSuite with 
BeforeAndAfterAll {
 
       // Kill the YARN app and check batch state should be KILLED.
       cluster.yarnClient.killApplication(appId)
-      s.verifySessionDead()
+      s.verifySessionKilled()
 
       cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus 
shouldBe
         FinalApplicationStatus.KILLED

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index c5b4b23..24c6cfd 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -179,8 +179,8 @@ class BatchSession(
           info(s"Batch session $id created [appid: ${appId.orNull}, state: 
${state.toString}, " +
             s"info: ${appInfo.asJavaMap}]")
         case SparkApp.State.FINISHED => _state = SessionState.Success()
-        case SparkApp.State.KILLED | SparkApp.State.FAILED =>
-          _state = SessionState.Dead()
+        case SparkApp.State.KILLED => _state = SessionState.Killed()
+        case SparkApp.State.FAILED => _state = SessionState.Dead()
         case _ =>
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index d12fcb3..a97747c 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -610,8 +610,9 @@ class InteractiveSession(
     synchronized {
       debug(s"$this app state changed from $oldState to $newState")
       newState match {
-        case SparkApp.State.FINISHED | SparkApp.State.KILLED | 
SparkApp.State.FAILED =>
+        case SparkApp.State.FINISHED | SparkApp.State.FAILED =>
           transition(SessionState.Dead())
+        case SparkApp.State.KILLED => transition(SessionState.Killed())
         case _ =>
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala
index aafe2e9..2eb864c 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala
@@ -31,7 +31,10 @@ class SparkProcApp (
 
   private var state = SparkApp.State.STARTING
 
+  private var killed = false
+
   override def kill(): Unit = {
+    killed = true
     if (process.isAlive) {
       process.destroy()
       waitThread.join()
@@ -53,8 +56,14 @@ class SparkProcApp (
     process.waitFor() match {
       case 0 => changeState(SparkApp.State.FINISHED)
       case exitCode =>
-        changeState(SparkApp.State.FAILED)
-        error(s"spark-submit exited with code $exitCode")
+        val message = if (killed) {
+          changeState(SparkApp.State.KILLED)
+          "job was killed by user"
+        } else {
+          changeState(SparkApp.State.FAILED)
+          s"spark-submit exited with code $exitCode"
+        }
+        error(message)
     }
   }
 }

Reply via email to