This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.10 by this push:
     new 46346dd9cd [KYUUBI #7226] Support to wait the batch recovery 
appliction submission to throttle the load on the system
46346dd9cd is described below

commit 46346dd9cd78429d3d519903dae405c7c2aff86d
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Dec 10 23:07:53 2025 -0800

    [KYUUBI #7226] Support to wait the batch recovery appliction submission to 
throttle the load on the system
    
    ### Why are the changes needed?
    
    Support to wait the batch recovery appliction submission to throttle the 
load on the system.
    
    Add a new config to control it
    
    Whether a metadata recovery task should wait for its corresponding engine 
submission to complete before finishing. All recovery tasks are submitted to a 
fixed thread pool controlled by kyuubi.metadata.recovery.threads. If true, a 
task blocks until the engine submission is done, helping throttle the load on 
the system if kyuubi.session.engine.startup.waitCompletion is false. If false, 
the task returns immediately after opening the session without waiting.
    
    Close #7226
    ### How was this patch tested?
    
    GA.
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7262 from turboFei/recover_concurrent.
    
    Closes #7226
    
    ea6282d54 [Wang, Fei] config
    2b0403d3f [Wang, Fei] refine docs
    b5c51010f [Wang, Fei] refine
    f6b510c5e [Wang, Fei] 1.10.3
    b892c7159 [Wang, Fei] Support to wait the batch recovery appliction 
submission to throttle the load on the system
    c4740dc89 [Wang, Fei] conf
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 572cef805fb83c117570285a46040832bd85a4ea)
    Signed-off-by: Wang, Fei <[email protected]>
---
 docs/configuration/settings.md                            |  1 +
 .../main/scala/org/apache/kyuubi/config/KyuubiConf.scala  | 13 +++++++++++++
 .../apache/kyuubi/server/KyuubiRestFrontendService.scala  | 15 ++++++++++++++-
 3 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index b143bae04d..60bc34a747 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -384,6 +384,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.metadata.cleaner.interval                | PT30M                      
                              | The interval to check and clean expired 
metadata.                                                                       
                                                                                
                                                                                
                                                                                
                      [...]
 | kyuubi.metadata.max.age                         | PT72H                      
                              | The maximum age of metadata, the metadata 
exceeding the age will be cleaned.                                              
                                                                                
                                                                                
                                                                                
                    [...]
 | kyuubi.metadata.recovery.threads                | 10                         
                              | The number of threads for recovery from the 
metadata store when the Kyuubi server restarts.                                 
                                                                                
                                                                                
                                                                                
                  [...]
+| kyuubi.metadata.recovery.waitEngineSubmission   | false                      
                              | Whether a metadata recovery task should wait 
for its corresponding engine submission to complete before finishing. All 
recovery tasks are submitted to a fixed thread pool controlled by 
kyuubi.metadata.recovery.threads. If true, a task blocks until the engine 
submission is done, helping throttle the load on the system if 
kyuubi.session.engine.startup.waitCompletion is false. If f [...]
 | kyuubi.metadata.request.async.retry.enabled     | true                       
                              | Whether to retry in async when metadata request 
failed. When true, return success response immediately even the metadata 
request failed, and schedule it in background until success, to tolerate 
long-time metadata store outages w/o blocking the submission request.           
                                                                                
                            [...]
 | kyuubi.metadata.request.async.retry.queue.size  | 65536                      
                              | The maximum queue size for buffering metadata 
requests in memory when the external metadata storage is down. Requests will be 
dropped if the queue exceeds. Only take affect when 
kyuubi.metadata.request.async.retry.enabled is `true`.                          
                                                                                
                                            [...]
 | kyuubi.metadata.request.async.retry.threads     | 10                         
                              | Number of threads in the metadata request async 
retry manager thread pool. Only take affect when 
kyuubi.metadata.request.async.retry.enabled is `true`.                          
                                                                                
                                                                                
                                             [...]
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 403c84d5a4..e5ae51e3f2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2066,6 +2066,19 @@ object KyuubiConf {
       .intConf
       .createWithDefault(10)
 
+  val METADATA_RECOVERY_WAIT_ENGINE_SUBMISSION: ConfigEntry[Boolean] =
+    buildConf("kyuubi.metadata.recovery.waitEngineSubmission")
+      .serverOnly
+      .doc("Whether a metadata recovery task should wait for its corresponding 
engine " +
+        "submission to complete before finishing. All recovery tasks are 
submitted to a fixed " +
+        s"thread pool controlled by ${METADATA_RECOVERY_THREADS.key}. If true, 
a task blocks " +
+        "until the engine submission is done, helping throttle the load on the 
system " +
+        s"if ${SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key} is false. " +
+        "If false, the task returns immediately after opening the session 
without waiting.")
+      .version("1.10.3")
+      .booleanConf
+      .createWithDefault(false)
+
   val METADATA_REQUEST_RETRY_INTERVAL: ConfigEntry[Long] =
     buildConf("kyuubi.metadata.request.retry.interval")
       .serverOnly
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 7fa111c7ff..7507a300e6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import 
org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
+import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.api.v1.ApiRootResource
 import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, 
KyuubiHttpAuthenticationFactory}
 import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
@@ -170,6 +171,7 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
   @VisibleForTesting
   private[kyuubi] def recoverBatchSessions(): Unit = {
     val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
+    val recoveryWaitEngineSubmission = 
conf.get(METADATA_RECOVERY_WAIT_ENGINE_SUBMISSION)
     val batchRecoveryExecutor =
       ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, 
"batch-recovery-executor")
     try {
@@ -179,7 +181,18 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
         val batchId = batchSession.batchJobSubmissionOp.batchId
         try {
           val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
-            
Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession)))
+            Utils.tryLogNonFatalError {
+              sessionManager.openBatchSession(batchSession)
+              if (recoveryWaitEngineSubmission) {
+                info(s"Waiting for batch[$batchId] engine submission during 
recovery")
+                val batchOp = batchSession.batchJobSubmissionOp
+                while 
(batchSession.getSessionEvent.forall(_.exception.isEmpty) &&
+                  !batchOp.appStarted &&
+                  !OperationState.isTerminal(batchOp.getStatus.state)) {
+                  Thread.sleep(300)
+                }
+              }
+            })
           Some(task -> batchId)
         } catch {
           case e: Throwable =>

Reply via email to