This is an automated email from the ASF dual-hosted git repository.

ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ca618e [LIVY-991][SERVER] Facing issues with the Livy UI Driver link 
(#437)
21ca618e is described below

commit 21ca618e13137ff60553e91284e7b6f26b571819
Author: RajshekharMuchandi <[email protected]>
AuthorDate: Tue Jun 25 19:54:52 2024 +0530

    [LIVY-991][SERVER] Facing issues with the Livy UI Driver link (#437)
    
    Added conditional check on finished state to set driverlogUrl to null
    
    Co-authored-by: Rajshekhar Muchandi <[email protected]>
---
 .../scala/org/apache/livy/utils/SparkYarnApp.scala |   5 +-
 .../org/apache/livy/utils/SparkYarnAppSpec.scala   | 118 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 1 deletion(-)

diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index dd551c20..3c573c20 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -315,9 +315,12 @@ class SparkYarnApp private[utils] (
             val latestAppInfo = {
               val attempt =
                 
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
-              val driverLogUrl =
+              val driverLogUrl = if (state == SparkApp.State.FINISHED) {
+                None
+              } else {
                 
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
                   .toOption
+              }
               AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
             }
 
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala 
b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index 17a78320..36995c93 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -471,6 +471,124 @@ class SparkYarnAppSpec extends FunSpec with 
LivyBaseUnitTestSuite {
       }
     }
 
+    it("should expose driver log url and Spark UI url in KILLED state") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val driverLogUrl = "DRIVER LOG URL"
+        val sparkUiUrl = "SPARK UI URL"
+
+        val mockApplicationAttemptId = mock[ApplicationAttemptId]
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
+        
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+        var done = false
+        when(mockAppReport.getYarnApplicationState).thenAnswer(new 
Answer[YarnApplicationState]() {
+          override def answer(invocation: InvocationOnMock): 
YarnApplicationState = {
+            KILLED
+          }
+        })
+        
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val mockAttemptReport = mock[ApplicationAttemptReport]
+        val mockContainerId = mock[ContainerId]
+        when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
+        
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
+          .thenReturn(mockAttemptReport)
+
+        val mockContainerReport = mock[ContainerReport]
+        
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
+
+        // Block test until getLogUrl is called 10 times.
+        val getLogUrlCountDown = new CountDownLatch(10)
+        when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
+          override def answer(invocation: InvocationOnMock): String = {
+            getLogUrlCountDown.countDown()
+            driverLogUrl
+          }
+        })
+
+        val mockListener = mock[SparkAppListener]
+
+        val app = new SparkYarnApp(
+          appTag, appIdOption, None, Some(mockListener), livyConf, 
mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+          done = true
+
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is 
finished.")
+
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockAppReport, atLeast(1)).getTrackingUrl()
+          verify(mockContainerReport, atLeast(1)).getLogUrl()
+          verify(mockListener).appIdKnown(appId.toString)
+          verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), 
Some(sparkUiUrl)))
+        }
+      }
+    }
+
+    it("should not expose driver log url and Spark UI url in FINISHED state") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val driverLogUrl = "DRIVER LOG URL"
+        val sparkUiUrl = "SPARK UI URL"
+
+        val mockApplicationAttemptId = mock[ApplicationAttemptId]
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
+        
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+        var done = false
+        when(mockAppReport.getYarnApplicationState).thenAnswer(new 
Answer[YarnApplicationState]() {
+          override def answer(invocation: InvocationOnMock): 
YarnApplicationState = {
+            FINISHED
+          }
+        })
+        
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val mockAttemptReport = mock[ApplicationAttemptReport]
+        val mockContainerId = mock[ContainerId]
+        when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
+        
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
+          .thenReturn(mockAttemptReport)
+
+        val mockContainerReport = mock[ContainerReport]
+        
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
+
+        // Block test until getLogUrl is called 10 times.
+        val getLogUrlCountDown = new CountDownLatch(10)
+        when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
+          override def answer(invocation: InvocationOnMock): String = {
+            getLogUrlCountDown.countDown()
+            driverLogUrl
+          }
+        })
+
+        val mockListener = mock[SparkAppListener]
+
+        val app = new SparkYarnApp(
+          appTag, appIdOption, None, Some(mockListener), livyConf, 
mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+          done = true
+
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is 
finished.")
+
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockAppReport, atLeast(1)).getTrackingUrl()
+          verify(mockContainerReport, never()).getLogUrl()
+          verify(mockListener).appIdKnown(appId.toString)
+          verify(mockListener, 
never()).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
+        }
+      }
+    }
+
     it("should not die on YARN-4411") {
       Clock.withSleepMethod(mockSleep) {
         val mockYarnClient = mock[YarnClient]

Reply via email to