Repository: incubator-livy
Updated Branches:
  refs/heads/branch-0.4 844275ccb -> 3d70104cf


[LIVY-406][SERVER] Fix Livy cannot app id in yarn client mode issue

In our `SparkYarnApp` logic, we have to wait for process to exit before to 
query app id. Since the process is never exited in yarn client mode, so it will 
block follow-up logic to get app id and other yarn application information.

So here propose to remove this logic, also because now we will query app id 
ever since app is launched, so we should increase the look-up time to avoid 
timeout.

Author: jerryshao <ss...@hortonworks.com>

Closes #50 from jerryshao/LIVY-406.

(cherry picked from commit 4a537e24d605766f901232760f39b44adae817a2)
Signed-off-by: jerryshao <ss...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/3d70104c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/3d70104c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/3d70104c

Branch: refs/heads/branch-0.4
Commit: 3d70104cf2403f8bf7e3caef770e8fc00eea5147
Parents: 844275c
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Sep 26 13:25:11 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Tue Sep 26 13:27:05 2017 +0800

----------------------------------------------------------------------
 conf/livy.conf.template                         |  4 +--
 .../main/scala/org/apache/livy/LivyConf.scala   |  2 +-
 .../org/apache/livy/utils/SparkYarnApp.scala    |  9 ------
 .../apache/livy/utils/SparkYarnAppSpec.scala    | 30 ++++++++++++++++++++
 4 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/conf/livy.conf.template
----------------------------------------------------------------------
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 2425059..86ca9ab 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -107,7 +107,7 @@
 # livy.server.recovery.state-store.url =
 
 # If Livy can't find the yarn app within this time, consider it lost.
-# livy.server.yarn.app-lookup-timeout = 60s
+# livy.server.yarn.app-lookup-timeout = 120s
 # When the cluster is busy, we may fail to launch yarn app in 
app-lookup-timeout, then it would
 # cause session leakage, so we need to check session leakage.
 # How long to check livy session leakage
@@ -116,7 +116,7 @@
 # livy.server.yarn.app-leakage.check-interval = 60s
 
 # How often Livy polls YARN to refresh YARN app state.
-# livy.server.yarn.poll-interval = 1s
+# livy.server.yarn.poll-interval = 5s
 #
 # Days to keep Livy server request logs.
 # livy.server.request-log-retain.days = 5

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/server/src/main/scala/org/apache/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala 
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 46af8e8..0cfc8f3 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -123,7 +123,7 @@ object LivyConf {
   val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
 
   // If Livy can't find the yarn app within this time, consider it lost.
-  val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", 
"60s")
+  val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", 
"120s")
 
   // How often Livy polls YARN to refresh YARN app state.
   val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 0ab6c44..358de65 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -231,15 +231,6 @@ class SparkYarnApp private[utils] (
   // batch YARN queries.
   private[utils] val yarnAppMonitorThread = 
Utils.startDaemonThread(s"yarnAppMonitorThread-$this") {
     try {
-      // Wait for spark-submit to finish submitting the app to YARN.
-      process.foreach { p =>
-        val exitCode = p.waitFor()
-        if (exitCode != 0) {
-          throw new Exception(s"spark-submit exited with code $exitCode}.\n" +
-            s"${process.get.inputLines.mkString("\n")}")
-        }
-      }
-
       // If appId is not known, query YARN by appTag to get it.
       val appId = try {
         appIdOption.map(ConverterUtils.toApplicationId).getOrElse {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala 
b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index b3c50da..27afd85 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -18,6 +18,7 @@ package org.apache.livy.utils
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -218,6 +219,35 @@ class SparkYarnAppSpec extends FunSpec with 
LivyBaseUnitTestSuite {
       }
     }
 
+    it("should get App Id") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val mockAppReport = mock[ApplicationReport]
+
+        
when(mockAppReport.getApplicationTags).thenReturn(Set(appTag.toLowerCase).asJava)
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getYarnApplicationState).thenReturn(RUNNING)
+        
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+        when(mockYarnClient.getApplications(Set("SPARK").asJava))
+          .thenReturn(List(mockAppReport).asJava)
+
+        val mockListener = mock[SparkAppListener]
+        val mockSparkSubmit = mock[LineBufferedProcess]
+        val app = new SparkYarnApp(
+          appTag, None, Some(mockSparkSubmit), Some(mockListener), livyConf, 
mockYarnClient)
+
+        cleanupThread(app.yarnAppMonitorThread) {
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is 
finished.")
+
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockListener).appIdKnown(appId.toString)
+        }
+      }
+    }
+
     it("should expose driver log url and Spark UI url") {
       Clock.withSleepMethod(mockSleep) {
         val mockYarnClient = mock[YarnClient]

Reply via email to