This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 9e24da7e8 [KYUUBI #5220][FOLLOWUP] Batch submitted considers 
application state
9e24da7e8 is described below

commit 9e24da7e8929a39283fee58605cb269cfbd45294
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Sep 14 12:13:20 2023 +0800

    [KYUUBI #5220][FOLLOWUP] Batch submitted considers application state
    
    ### _Why are the changes needed?_
    
    This PR aims to fix the `SparkSubmit` concurrency limit implemented in 
#5220.
    
    "submitted" judgment in #5220 only considered `OperationState`, actually, 
`ApplicationState` should be counted too. For instance, if a batch is pending 
in `ACCEPTED` state, the `SparkSubmit` process won't exit until changed to 
`RUNNING` or `FAILED` state, in such case, the `OperationState` is `RUNNING` 
and `ApplicationState` is `PENDING`, it should not be treated as "submitted".
    
    Additionally, this PR treats metastore as the single of truth for batch 
instead of `SessionManager`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No.
    
    Closes #5279 from pan3793/5220-followup.
    
    Closes #5220
    
    903abc6d4 [Cheng Pan] Fix
    0af6738d2 [Cheng Pan] [KYUUBI #5220][FOLLOWUP] Batch submmited should not 
contain application pending state
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit dd2cd516d72631b464bcec2d4bcafc29d61d3880)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/kyuubi/server/KyuubiBatchService.scala  | 37 ++++++++++++----------
 .../kyuubi/server/metadata/api/Metadata.scala      | 12 ++++++-
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index bf10a68fa..2bfbbce2a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.server
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
+import org.apache.kyuubi.engine.ApplicationState
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.MetadataManager
 import org.apache.kyuubi.service.{AbstractService, Serverable}
@@ -83,26 +84,28 @@ class KyuubiBatchService(
               metadata.requestArgs,
               Some(metadata),
               fromRecovery = false)
-            val sessionHandle = sessionManager.openBatchSession(batchSession)
+            sessionManager.openBatchSession(batchSession)
             var submitted = false
             while (!submitted) { // block until batch job submitted
-              submitted = sessionManager.getBatchSession(sessionHandle).map { 
batchSession =>
-                val batchState = 
batchSession.batchJobSubmissionOp.getStatus.state
-                batchState == OperationState.RUNNING || 
OperationState.isTerminal(batchState)
-              }.getOrElse {
-                error(s"Batch Session $batchId is not existed, marked as 
finished")
-                true
+              submitted = metadataManager.getBatchSessionMetadata(batchId) 
match {
+                case Some(metadata) if 
OperationState.isTerminal(metadata.opState) =>
+                  true
+                case Some(metadata) if metadata.opState == 
OperationState.RUNNING =>
+                  metadata.appState match {
+                    // app that is not submitted to resource manager
+                    case None | Some(ApplicationState.NOT_FOUND) => false
+                    // app that is pending in resource manager
+                    case Some(ApplicationState.PENDING) => false
+                    // not sure, added for safe
+                    case Some(ApplicationState.UNKNOWN) => false
+                    case _ => true
+                  }
+                case Some(_) =>
+                  false
+                case None =>
+                  error(s"$batchId does not existed in metastore, assume it is 
finished")
+                  true
               }
-              // should we always treat metastore as the single of truth?
-              //
-              // submitted = metadataManager.getBatchSessionMetadata(batchId) 
match {
-              //   case Some(metadata) =>
-              //     val batchState = OperationState.withName(metadata.state)
-              //     batchState == OperationState.RUNNING || 
OperationState.isTerminal(batchState)
-              //   case None =>
-              //     error(s"$batchId does not existed in metastore, assume it 
is finished")
-              //     true
-              // }
               if (!submitted) Thread.sleep(1000)
             }
             info(s"$batchId is submitted or finished.")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
index 12759f8cc..3e3d94828 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
@@ -18,7 +18,10 @@
 package org.apache.kyuubi.server.metadata.api
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ApplicationManagerInfo
+import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState}
+import org.apache.kyuubi.engine.ApplicationState.ApplicationState
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.session.SessionType.SessionType
 
 /**
@@ -82,4 +85,11 @@ case class Metadata(
       requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key),
       requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key))
   }
+
+  def opState: OperationState = {
+    assert(state != null, "invalid state, a normal batch record must have 
non-null state")
+    OperationState.withName(state)
+  }
+
+  def appState: Option[ApplicationState] = 
Option(engineState).map(ApplicationState.withName)
 }

Reply via email to