This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f33ad8b8e [KYUUBI #5220] Batch submitter should only block submitting 
stage
f33ad8b8e is described below

commit f33ad8b8e6c0ac534f2f970d59cf29f47eb44586
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Sep 1 03:19:36 2023 +0800

    [KYUUBI #5220] Batch submitter should only block submitting stage
    
    ### _Why are the changes needed?_
    
    This PR changes the block phase of the batch submitter
    
    - before: from PENDING until TERMINATED
    - after: from PENDING until RUNNING or TERMINATED
    
    Usually, we submit Spark batch applications in cluster mode with 
waitAppCompletion disabled, after a Spark application goes into the RUNNING or 
TERMINATED stage, the `spark-submit` process exits and the batch session does 
not occupy too many resources. Thus, limiting the concurrency on the submitting 
phase instead of the whole lifecycle of the Spark app makes more sense.
    
    In practice, we use 16 threads for Kyuubi instance with 8C32G. A larger 
concurrency may result in CPU resources being exhausted and `spark-submit` 
process hanging.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No.
    
    Closes #5220 from pan3793/batch-submitter-threads.
    
    Closes #5220
    
    543c31cd1 [Cheng Pan] nit
    dc5d0c816 [Cheng Pan] nit
    9b499b0eb [Cheng Pan] Batch submmitter should only block during submit stage
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../scala/org/apache/kyuubi/config/KyuubiConf.scala |  2 +-
 .../apache/kyuubi/server/KyuubiBatchService.scala   | 21 +++++++++++----------
 2 files changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index bec69e95e..88c61e23d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1687,7 +1687,7 @@ object KyuubiConf {
         s"when ${BATCH_SUBMITTER_ENABLED.key} is enabled")
       .version("1.8.0")
       .intConf
-      .createWithDefault(100)
+      .createWithDefault(16)
 
   val BATCH_IMPL_VERSION: ConfigEntry[String] =
     buildConf("kyuubi.batch.impl.version")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 250662835..7ed2ab8e1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -92,27 +92,28 @@ class KyuubiBatchService(
               clusterManager = 
batchSession.batchJobSubmissionOp.builder.clusterManager())
             metadataManager.updateMetadata(metadataForUpdate, 
asyncRetryOnError = false)
             val sessionHandle = sessionManager.openBatchSession(batchSession)
-            var terminated = false
-            while (!terminated) { // block until batch job finished
-              terminated = sessionManager.getBatchSession(sessionHandle).map { 
batchSession =>
-                val batchOp = batchSession.batchJobSubmissionOp
-                OperationState.isTerminal(batchOp.getStatus.state)
+            var submitted = false
+            while (!submitted) { // block until batch job submitted
+              submitted = sessionManager.getBatchSession(sessionHandle).map { 
batchSession =>
+                val batchState = 
batchSession.batchJobSubmissionOp.getStatus.state
+                batchState == OperationState.RUNNING || 
OperationState.isTerminal(batchState)
               }.getOrElse {
                 error(s"Batch Session $batchId is not existed, marked as 
finished")
                 true
               }
               // should we always treat metastore as the single of truth?
               //
-              // terminated = metadataManager.getBatchSessionMetadata(batchId) 
match {
+              // submitted = metadataManager.getBatchSessionMetadata(batchId) 
match {
               //   case Some(metadata) =>
-              //     
OperationState.isTerminal(OperationState.withName(metadata.state))
+              //     val batchState = OperationState.withName(metadata.state)
+              //     batchState == OperationState.RUNNING || 
OperationState.isTerminal(batchState)
               //   case None =>
-              //     error(s"$batchId is not existed in metastore, assume it 
is finished")
+              //     error(s"$batchId does not existed in metastore, assume it 
is finished")
               //     true
               // }
-              if (!terminated) Thread.sleep(1000)
+              if (!submitted) Thread.sleep(1000)
             }
-            info(s"$batchId is finished.")
+            info(s"$batchId is submitted.")
         }
       }
     }

Reply via email to