Repository: incubator-livy Updated Branches: refs/heads/master e3f45a057 -> 551cc5309
[LIVY-452] Differentiate FAILED and KILLED states Currently, it's not possible to distinguish between two states - SparkApp.State.KILLED and SparkApp.State.FAILED. In both cases the session state will be SessionState.Dead(). This patch adds new SessionState.Killed() which will be used when job was actually killed by user. https://issues.apache.org/jira/browse/LIVY-452 - Corresponding unit test was adjusted - Tested manually on YARN cluster with `livy.spark.deploy-mode=cluster` Author: Alexey Romanenko <aromanenko....@gmail.com> Closes #92 from aromanenko-dev/LIVY-452-SessionState-Killed. Change-Id: Ia18808d7954e20653e152d8bf6748f43c2707b18 Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/551cc530 Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/551cc530 Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/551cc530 Branch: refs/heads/master Commit: 551cc53095f0a4b5382602ba0c296f8cf8932e44 Parents: e3f45a0 Author: Alexey Romanenko <aromanenko....@gmail.com> Authored: Mon May 21 09:25:04 2018 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Mon May 21 09:25:04 2018 +0800 ---------------------------------------------------------------------- .../scala/org/apache/livy/sessions/SessionState.scala | 4 ++++ docs/rest-api.md | 4 ++++ .../apache/livy/test/framework/LivyRestClient.scala | 5 +++++ .../src/test/scala/org/apache/livy/test/BatchIT.scala | 2 +- .../org/apache/livy/server/batch/BatchSession.scala | 4 ++-- .../livy/server/interactive/InteractiveSession.scala | 3 ++- .../scala/org/apache/livy/utils/SparkProcApp.scala | 13 +++++++++++-- 7 files changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/core/src/main/scala/org/apache/livy/sessions/SessionState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/livy/sessions/SessionState.scala b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala index e56cfbb..d731c9b 100644 --- a/core/src/main/scala/org/apache/livy/sessions/SessionState.scala +++ b/core/src/main/scala/org/apache/livy/sessions/SessionState.scala @@ -39,6 +39,7 @@ object SessionState { case "shutting_down" => ShuttingDown case "error" => Error() case "dead" => Dead() + case "killed" => Killed() case "success" => Success() case _ => throw new IllegalArgumentException(s"Illegal session state: $s") } @@ -57,6 +58,9 @@ object SessionState { object ShuttingDown extends SessionState("shutting_down", false) + case class Killed(override val time: Long = System.nanoTime()) extends + FinishedSessionState("killed", false, time) + case class Error(override val time: Long = System.nanoTime()) extends FinishedSessionState("error", true, time) http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/docs/rest-api.md ---------------------------------------------------------------------- diff --git a/docs/rest-api.md b/docs/rest-api.md index 8c928a8..5949b93 100644 --- a/docs/rest-api.md +++ b/docs/rest-api.md @@ -644,6 +644,10 @@ A session represents an interactive shell. <td>Session has exited</td> </tr> <tr> + <td>killed</td> + <td>Session has been killed</td> + </tr> + <tr> <td>success</td> <td>Session is successfully stopped</td> </tr> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala ---------------------------------------------------------------------- diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala index 5300848..1087559 100644 --- a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala +++ b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala @@ -112,6 +112,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) class BatchSession(id: Int) extends Session(id, BATCH_TYPE) { def verifySessionDead(): Unit = verifySessionState(SessionState.Dead()) + def verifySessionKilled(): Unit = verifySessionState(SessionState.Killed()) def verifySessionRunning(): Unit = verifySessionState(SessionState.Running) def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success()) } @@ -241,6 +242,10 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) def verifySessionIdle(): Unit = { verifySessionState(SessionState.Idle) } + + def verifySessionKilled(): Unit = { + verifySessionState(SessionState.Killed()) + } } def startBatch( http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala ---------------------------------------------------------------------- diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala index b02301b..7c433fb 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala @@ -105,7 +105,7 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll { // Kill the YARN app and check batch state should be KILLED. cluster.yarnClient.killApplication(appId) - s.verifySessionDead() + s.verifySessionKilled() cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe FinalApplicationStatus.KILLED http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index c5b4b23..24c6cfd 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -179,8 +179,8 @@ class BatchSession( info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " + s"info: ${appInfo.asJavaMap}]") case SparkApp.State.FINISHED => _state = SessionState.Success() - case SparkApp.State.KILLED | SparkApp.State.FAILED => - _state = SessionState.Dead() + case SparkApp.State.KILLED => _state = SessionState.Killed() + case SparkApp.State.FAILED => _state = SessionState.Dead() case _ => } } http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index d12fcb3..a97747c 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -610,8 +610,9 @@ class InteractiveSession( synchronized { debug(s"$this app state changed from $oldState to $newState") newState match { - case SparkApp.State.FINISHED | SparkApp.State.KILLED | SparkApp.State.FAILED => + case SparkApp.State.FINISHED | SparkApp.State.FAILED => transition(SessionState.Dead()) + case SparkApp.State.KILLED => transition(SessionState.Killed()) case _ => } } http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/551cc530/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala index aafe2e9..2eb864c 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkProcApp.scala @@ -31,7 +31,10 @@ class SparkProcApp ( private var state = SparkApp.State.STARTING + private var killed = false + override def kill(): Unit = { + killed = true if (process.isAlive) { process.destroy() waitThread.join() @@ -53,8 +56,14 @@ class SparkProcApp ( process.waitFor() match { case 0 => changeState(SparkApp.State.FINISHED) case exitCode => - changeState(SparkApp.State.FAILED) - error(s"spark-submit exited with code $exitCode") + val message = if (killed) { + changeState(SparkApp.State.KILLED) + "job was killed by user" + } else { + changeState(SparkApp.State.FAILED) + s"spark-submit exited with code $exitCode" + } + error(message) } } }