Repository: incubator-livy Updated Branches: refs/heads/branch-0.4 844275ccb -> 3d70104cf
[LIVY-406][SERVER] Fix Livy cannot app id in yarn client mode issue In our `SparkYarnApp` logic, we have to wait for process to exit before to query app id. Since the process is never exited in yarn client mode, so it will block follow-up logic to get app id and other yarn application information. So here propose to remove this logic, also because now we will query app id ever since app is launched, so we should increase the look-up time to avoid timeout. Author: jerryshao <ss...@hortonworks.com> Closes #50 from jerryshao/LIVY-406. (cherry picked from commit 4a537e24d605766f901232760f39b44adae817a2) Signed-off-by: jerryshao <ss...@hortonworks.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/3d70104c Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/3d70104c Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/3d70104c Branch: refs/heads/branch-0.4 Commit: 3d70104cf2403f8bf7e3caef770e8fc00eea5147 Parents: 844275c Author: jerryshao <ss...@hortonworks.com> Authored: Tue Sep 26 13:25:11 2017 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Tue Sep 26 13:27:05 2017 +0800 ---------------------------------------------------------------------- conf/livy.conf.template | 4 +-- .../main/scala/org/apache/livy/LivyConf.scala | 2 +- .../org/apache/livy/utils/SparkYarnApp.scala | 9 ------ .../apache/livy/utils/SparkYarnAppSpec.scala | 30 ++++++++++++++++++++ 4 files changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/conf/livy.conf.template ---------------------------------------------------------------------- diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 2425059..86ca9ab 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -107,7 +107,7 @@ # livy.server.recovery.state-store.url = # If Livy can't find the yarn app within this time, consider it lost. -# livy.server.yarn.app-lookup-timeout = 60s +# livy.server.yarn.app-lookup-timeout = 120s # When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would # cause session leakage, so we need to check session leakage. # How long to check livy session leakage @@ -116,7 +116,7 @@ # livy.server.yarn.app-leakage.check-interval = 60s # How often Livy polls YARN to refresh YARN app state. -# livy.server.yarn.poll-interval = 1s +# livy.server.yarn.poll-interval = 5s # # Days to keep Livy server request logs. # livy.server.request-log-retain.days = 5 http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/server/src/main/scala/org/apache/livy/LivyConf.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 46af8e8..0cfc8f3 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -123,7 +123,7 @@ object LivyConf { val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200) // If Livy can't find the yarn app within this time, consider it lost. - val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s") + val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "120s") // How often Livy polls YARN to refresh YARN app state. val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s") http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index 0ab6c44..358de65 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -231,15 +231,6 @@ class SparkYarnApp private[utils] ( // batch YARN queries. private[utils] val yarnAppMonitorThread = Utils.startDaemonThread(s"yarnAppMonitorThread-$this") { try { - // Wait for spark-submit to finish submitting the app to YARN. - process.foreach { p => - val exitCode = p.waitFor() - if (exitCode != 0) { - throw new Exception(s"spark-submit exited with code $exitCode}.\n" + - s"${process.get.inputLines.mkString("\n")}") - } - } - // If appId is not known, query YARN by appTag to get it. val appId = try { appIdOption.map(ConverterUtils.toApplicationId).getOrElse { http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/3d70104c/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index b3c50da..27afd85 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -18,6 +18,7 @@ package org.apache.livy.utils import java.util.concurrent.{CountDownLatch, TimeUnit} +import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps @@ -218,6 +219,35 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { } } + it("should get App Id") { + Clock.withSleepMethod(mockSleep) { + val mockYarnClient = mock[YarnClient] + val mockAppReport = mock[ApplicationReport] + + when(mockAppReport.getApplicationTags).thenReturn(Set(appTag.toLowerCase).asJava) + when(mockAppReport.getApplicationId).thenReturn(appId) + when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED) + when(mockAppReport.getYarnApplicationState).thenReturn(RUNNING) + when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport) + when(mockYarnClient.getApplications(Set("SPARK").asJava)) + .thenReturn(List(mockAppReport).asJava) + + val mockListener = mock[SparkAppListener] + val mockSparkSubmit = mock[LineBufferedProcess] + val app = new SparkYarnApp( + appTag, None, Some(mockSparkSubmit), Some(mockListener), livyConf, mockYarnClient) + + cleanupThread(app.yarnAppMonitorThread) { + app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis) + assert(!app.yarnAppMonitorThread.isAlive, + "YarnAppMonitorThread should terminate after YARN app is finished.") + + verify(mockYarnClient, atLeast(1)).getApplicationReport(appId) + verify(mockListener).appIdKnown(appId.toString) + } + } + } + it("should expose driver log url and Spark UI url") { Clock.withSleepMethod(mockSleep) { val mockYarnClient = mock[YarnClient]