This is an automated email from the ASF dual-hosted git repository.
ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 21ca618e [LIVY-991][SERVER] Facing issues with the Livy UI Driver link
(#437)
21ca618e is described below
commit 21ca618e13137ff60553e91284e7b6f26b571819
Author: RajshekharMuchandi <[email protected]>
AuthorDate: Tue Jun 25 19:54:52 2024 +0530
[LIVY-991][SERVER] Facing issues with the Livy UI Driver link (#437)
Added conditional check on finished state to set driverlogUrl to null
Co-authored-by: Rajshekhar Muchandi <[email protected]>
---
.../scala/org/apache/livy/utils/SparkYarnApp.scala | 5 +-
.../org/apache/livy/utils/SparkYarnAppSpec.scala | 118 +++++++++++++++++++++
2 files changed, 122 insertions(+), 1 deletion(-)
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index dd551c20..3c573c20 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -315,9 +315,12 @@ class SparkYarnApp private[utils] (
val latestAppInfo = {
val attempt =
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
- val driverLogUrl =
+ val driverLogUrl = if (state == SparkApp.State.FINISHED) {
+ None
+ } else {
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
.toOption
+ }
AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
}
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index 17a78320..36995c93 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -471,6 +471,124 @@ class SparkYarnAppSpec extends FunSpec with
LivyBaseUnitTestSuite {
}
}
+ it("should expose driver log url and Spark UI url in KILLED state") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+ val driverLogUrl = "DRIVER LOG URL"
+ val sparkUiUrl = "SPARK UI URL"
+
+ val mockApplicationAttemptId = mock[ApplicationAttemptId]
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+ when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
+
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+ var done = false
+ when(mockAppReport.getYarnApplicationState).thenAnswer(new
Answer[YarnApplicationState]() {
+ override def answer(invocation: InvocationOnMock):
YarnApplicationState = {
+ KILLED
+ }
+ })
+
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val mockAttemptReport = mock[ApplicationAttemptReport]
+ val mockContainerId = mock[ContainerId]
+ when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
+
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
+ .thenReturn(mockAttemptReport)
+
+ val mockContainerReport = mock[ContainerReport]
+
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
+
+ // Block test until getLogUrl is called 10 times.
+ val getLogUrlCountDown = new CountDownLatch(10)
+ when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
+ override def answer(invocation: InvocationOnMock): String = {
+ getLogUrlCountDown.countDown()
+ driverLogUrl
+ }
+ })
+
+ val mockListener = mock[SparkAppListener]
+
+ val app = new SparkYarnApp(
+ appTag, appIdOption, None, Some(mockListener), livyConf,
mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+ done = true
+
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should terminate after YARN app is
finished.")
+
+ verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+ verify(mockAppReport, atLeast(1)).getTrackingUrl()
+ verify(mockContainerReport, atLeast(1)).getLogUrl()
+ verify(mockListener).appIdKnown(appId.toString)
+ verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl),
Some(sparkUiUrl)))
+ }
+ }
+ }
+
+ it("should not expose driver log url and Spark UI url in FINISHED state") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+ val driverLogUrl = "DRIVER LOG URL"
+ val sparkUiUrl = "SPARK UI URL"
+
+ val mockApplicationAttemptId = mock[ApplicationAttemptId]
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+ when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
+
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+ var done = false
+ when(mockAppReport.getYarnApplicationState).thenAnswer(new
Answer[YarnApplicationState]() {
+ override def answer(invocation: InvocationOnMock):
YarnApplicationState = {
+ FINISHED
+ }
+ })
+
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val mockAttemptReport = mock[ApplicationAttemptReport]
+ val mockContainerId = mock[ContainerId]
+ when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
+
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
+ .thenReturn(mockAttemptReport)
+
+ val mockContainerReport = mock[ContainerReport]
+
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
+
+ // Block test until getLogUrl is called 10 times.
+ val getLogUrlCountDown = new CountDownLatch(10)
+ when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
+ override def answer(invocation: InvocationOnMock): String = {
+ getLogUrlCountDown.countDown()
+ driverLogUrl
+ }
+ })
+
+ val mockListener = mock[SparkAppListener]
+
+ val app = new SparkYarnApp(
+ appTag, appIdOption, None, Some(mockListener), livyConf,
mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+ done = true
+
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should terminate after YARN app is
finished.")
+
+ verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+ verify(mockAppReport, atLeast(1)).getTrackingUrl()
+ verify(mockContainerReport, never()).getLogUrl()
+ verify(mockListener).appIdKnown(appId.toString)
+ verify(mockListener,
never()).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
+ }
+ }
+ }
+
it("should not die on YARN-4411") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]