Gustavo Martin created LIVY-620:
-----------------------------------
Summary: Spark batch session always ends with success when
configuration is master yarn and deploy-mode client
Key: LIVY-620
URL: https://issues.apache.org/jira/browse/LIVY-620
Project: Livy
Issue Type: Improvement
Components: Batch
Affects Versions: 0.5.0
Reporter: Gustavo Martin
In AWS emr-5.23.0 with Livy 0.5.0 and the following configuration in
/etc/livy/conf/livy.conf:
{noformat}
livy.spark.master yarn
livy.spark.deploy-mode client
{noformat}
Batch session always ends with success because yarn always ends with status
Succeeded. Even if spark fails for some reason (exceptions or whatever) batch
session ends with success.
Not sure, but the issue about yarn always ending with success when client
deploy-mode might be related to this JIRA:
https://issues.apache.org/jira/browse/SPARK-11058
When client deploy-mode and having spark errors yarn always ends with status
Succeeded but the process launched by Livy (the one running
org.apache.spark.deploy.SparkSubmit) is killed and exits with no 0 return code.
So even if in this case yarn always ends with success livy can find out if it
ended with error and end with error itself.
I have already implemented a patch (in master branch) that could fix this issue:
{noformat}
diff --git
a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 4b27058..c215a8e 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -93,6 +93,7 @@ object BatchSession extends Logging {
val file = resolveURIs(Seq(request.file), livyConf)(0)
val sparkSubmit = builder.start(Some(file), request.args)
+ sparkSubmit.destroy()
Utils.startDaemonThread(s"batch-session-process-$id") {
childProcesses.incrementAndGet()
@@ -101,6 +102,7 @@ object BatchSession extends Logging {
case 0 =>
case exitCode =>
warn(s"spark-submit exited with code $exitCode")
+ s.stateChanged(SparkApp.State.KILLED)
}
} finally {
childProcesses.decrementAndGet()
@@ -182,6 +184,14 @@ class BatchSession(
override def stateChanged(oldState: SparkApp.State, newState:
SparkApp.State): Unit = {
synchronized {
debug(s"$this state changed from $oldState to $newState")
+ if (_state != SessionState.Dead()) {
+ stateChanged(newState)
+ }
+ }
+ }
+
+ private def stateChanged(newState: SparkApp.State): Unit = {
+ synchronized {
newState match {
case SparkApp.State.RUNNING =>
_state = SessionState.Running
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)