This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c6f60b7 [LIVY-896] Livy not capture spark-submit error exit if timing 
is right (#358)
8c6f60b7 is described below

commit 8c6f60b74633f25c211ca248ba290de65aee89f9
Author: Jeff Xu <[email protected]>
AuthorDate: Tue Nov 8 01:39:15 2022 -0800

    [LIVY-896] Livy not capture spark-submit error exit if timing is right 
(#358)
---
 .../scala/org/apache/livy/utils/SparkYarnApp.scala | 67 ++++++++++-------
 .../org/apache/livy/utils/SparkYarnAppSpec.scala   | 86 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 26 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index a245823e..dd551c20 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -164,6 +164,10 @@ class SparkYarnApp private[utils] (
     }
   }
 
+  private def isProcessAlive(): Boolean = {
+    process.isDefined && process.get.isAlive
+  }
+
   private def isProcessErrExit(): Boolean = {
     process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
   }
@@ -282,34 +286,45 @@ class SparkYarnApp private[utils] (
         try {
           Clock.sleep(pollInterval.toMillis)
 
-          // Refresh application state
-          val appReport = yarnClient.getApplicationReport(appId)
-          yarnDiagnostics = getYarnDiagnostics(appReport)
-          changeState(mapYarnState(
-            appReport.getApplicationId,
-            appReport.getYarnApplicationState,
-            appReport.getFinalApplicationStatus))
-
-          if (isProcessErrExit()) {
-            if (killed) {
-              changeState(SparkApp.State.KILLED)
-            } else {
-              changeState(SparkApp.State.FAILED)
+          if (!isProcessAlive()) {
+            // Refresh application state
+            val appReport = yarnClient.getApplicationReport(appId)
+            yarnDiagnostics = getYarnDiagnostics(appReport)
+
+            // figure out the application's actual state and update in a 
single operation
+            val sessState =
+              if (isProcessErrExit()) {
+                if (killed) {
+                  debug(s"sess state: process killed")
+                  SparkApp.State.KILLED
+                } else {
+                  debug(s"sess state: process err exited")
+                  SparkApp.State.FAILED
+                }
+              }
+              else {
+                val yarnState = mapYarnState(
+                  appReport.getApplicationId,
+                  appReport.getYarnApplicationState,
+                  appReport.getFinalApplicationStatus)
+                debug(s"sess state: yarn=${yarnState}")
+                yarnState
+              }
+            changeState(sessState)
+
+            val latestAppInfo = {
+              val attempt =
+                
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
+              val driverLogUrl =
+                
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
+                  .toOption
+              AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
             }
-          }
-
-          val latestAppInfo = {
-            val attempt =
-              
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
-            val driverLogUrl =
-              
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
-                .toOption
-            AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
-          }
 
-          if (appInfo != latestAppInfo) {
-            listener.foreach(_.infoChanged(latestAppInfo))
-            appInfo = latestAppInfo
+            if (appInfo != latestAppInfo) {
+              listener.foreach(_.infoChanged(latestAppInfo))
+              appInfo = latestAppInfo
+            }
           }
         } catch {
           // This exception might be thrown during app is starting up. It's 
transient.
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala 
b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index ddd97674..17a78320 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -269,6 +269,92 @@ class SparkYarnAppSpec extends FunSpec with 
LivyBaseUnitTestSuite {
       }
     }
 
+    // LIVY-896
+    it("should end with state failed when spark submit failed but Yarn reports 
SUCCESS") {
+      Clock.withSleepMethod(mockSleep) {
+        val diag = "DIAG"
+        val mockYarnClient = mock[YarnClient]
+
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getDiagnostics).thenReturn(diag)
+        
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        
when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+        
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val mockSparkSubmit = mock[LineBufferedProcess]
+        when(mockSparkSubmit.isAlive).thenReturn(false)
+        when(mockSparkSubmit.exitValue).thenReturn(-1)
+
+        val listener = new SparkAppListener {
+          var nStateChanged = 0
+          override def stateChanged(oldState: SparkApp.State, newState: 
SparkApp.State): Unit = {
+            nStateChanged += 1
+          }
+
+        }
+        val app = new SparkYarnApp(
+          appTag,
+          None,
+          Some(mockSparkSubmit),
+          Some(listener),
+          livyConf,
+          mockYarnClient)
+
+        cleanupThread(app.yarnAppMonitorThread) {
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is 
finished.")
+          assert(app.state == SparkApp.State.FAILED,
+            "SparkYarnApp should end with state failed when spark submit 
failed")
+          assert(listener.nStateChanged == 1,
+            "SparkYarnApp should make state change only once.")
+        }
+      }
+    }
+
+    // LIVY-896
+    it("should never change state if spark submit not finished") {
+      Clock.withSleepMethod(mockSleep) {
+        val diag = "DIAG"
+        val mockYarnClient = mock[YarnClient]
+
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getDiagnostics).thenReturn(diag)
+        
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        
when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+        
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val mockSparkSubmit = mock[LineBufferedProcess]
+        when(mockSparkSubmit.isAlive).thenReturn(true)
+
+        val listener = new SparkAppListener {
+          var nStateChanged = 0
+          override def stateChanged(oldState: SparkApp.State, newState: 
SparkApp.State): Unit = {
+            nStateChanged += 1
+          }
+
+        }
+        val app = new SparkYarnApp(
+          appTag,
+          None,
+          Some(mockSparkSubmit),
+          Some(listener),
+          livyConf,
+          mockYarnClient)
+
+        cleanupThread(app.yarnAppMonitorThread) {
+          // 2 seconds is sufficient - yarn poll interval should be 200ms
+          app.yarnAppMonitorThread.join((2 seconds).toMillis)
+          assert(app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should not terminate if spark-submit is 
still alive.")
+          assert(listener.nStateChanged == 0,
+            "SparkYarnApp should not make state change.")
+        }
+      }
+    }
+
     it("should map YARN state to SparkApp.State correctly") {
       val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf)
       cleanupThread(app.yarnAppMonitorThread) {

Reply via email to