This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 572cef805f [KYUUBI #7226] Support to wait the batch recovery
appliction submission to throttle the load on the system
572cef805f is described below
commit 572cef805fb83c117570285a46040832bd85a4ea
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Dec 10 23:07:53 2025 -0800
[KYUUBI #7226] Support to wait the batch recovery appliction submission to
throttle the load on the system
### Why are the changes needed?
Support to wait the batch recovery appliction submission to throttle the
load on the system.
Add a new config to control it
Whether a metadata recovery task should wait for its corresponding engine
submission to complete before finishing. All recovery tasks are submitted to a
fixed thread pool controlled by kyuubi.metadata.recovery.threads. If true, a
task blocks until the engine submission is done, helping throttle the load on
the system if kyuubi.session.engine.startup.waitCompletion is false. If false,
the task returns immediately after opening the session without waiting.
Close #7226
### How was this patch tested?
GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7262 from turboFei/recover_concurrent.
Closes #7226
ea6282d54 [Wang, Fei] config
2b0403d3f [Wang, Fei] refine docs
b5c51010f [Wang, Fei] refine
f6b510c5e [Wang, Fei] 1.10.3
b892c7159 [Wang, Fei] Support to wait the batch recovery appliction
submission to throttle the load on the system
c4740dc89 [Wang, Fei] conf
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
docs/configuration/settings.md | 1 +
.../main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 13 +++++++++++++
.../apache/kyuubi/server/KyuubiRestFrontendService.scala | 15 ++++++++++++++-
3 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index ab1966cde6..bd45438e37 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -387,6 +387,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.metadata.cleaner.interval | PT30M
| The interval to check and clean expired
metadata.
[...]
| kyuubi.metadata.max.age | PT72H
| The maximum age of metadata, the metadata
exceeding the age will be cleaned.
[...]
| kyuubi.metadata.recovery.threads | 10
| The number of threads for recovery from the
metadata store when the Kyuubi server restarts.
[...]
+| kyuubi.metadata.recovery.waitEngineSubmission | false
| Whether a metadata recovery task should wait
for its corresponding engine submission to complete before finishing. All
recovery tasks are submitted to a fixed thread pool controlled by
kyuubi.metadata.recovery.threads. If true, a task blocks until the engine
submission is done, helping throttle the load on the system if
kyuubi.session.engine.startup.waitCompletion is false. If f [...]
| kyuubi.metadata.request.async.retry.enabled | true
| Whether to retry in async when metadata request
failed. When true, return success response immediately even the metadata
request failed, and schedule it in background until success, to tolerate
long-time metadata store outages w/o blocking the submission request.
[...]
| kyuubi.metadata.request.async.retry.queue.size | 65536
| The maximum queue size for buffering metadata
requests in memory when the external metadata storage is down. Requests will be
dropped if the queue exceeds. Only take affect when
kyuubi.metadata.request.async.retry.enabled is `true`.
[...]
| kyuubi.metadata.request.async.retry.threads | 10
| Number of threads in the metadata request async
retry manager thread pool. Only take affect when
kyuubi.metadata.request.async.retry.enabled is `true`.
[...]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index eb17e222ed..c1fad17ec0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2104,6 +2104,19 @@ object KyuubiConf {
.intConf
.createWithDefault(10)
+ val METADATA_RECOVERY_WAIT_ENGINE_SUBMISSION: ConfigEntry[Boolean] =
+ buildConf("kyuubi.metadata.recovery.waitEngineSubmission")
+ .serverOnly
+ .doc("Whether a metadata recovery task should wait for its corresponding
engine " +
+ "submission to complete before finishing. All recovery tasks are
submitted to a fixed " +
+ s"thread pool controlled by ${METADATA_RECOVERY_THREADS.key}. If true,
a task blocks " +
+ "until the engine submission is done, helping throttle the load on the
system " +
+ s"if ${SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key} is false. " +
+ "If false, the task returns immediately after opening the session
without waiting.")
+ .version("1.10.3")
+ .booleanConf
+ .createWithDefault(false)
+
val METADATA_REQUEST_RETRY_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.metadata.request.retry.interval")
.serverOnly
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 787ac0b047..8f9be59a54 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -34,6 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import
org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
+import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter,
KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
@@ -181,6 +182,7 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
@VisibleForTesting
private[kyuubi] def recoverBatchSessions(): Unit =
withBatchRecoveryLockRequired {
val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
+ val recoveryWaitEngineSubmission =
conf.get(METADATA_RECOVERY_WAIT_ENGINE_SUBMISSION)
val batchRecoveryExecutor =
ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads,
"batch-recovery-executor")
try {
@@ -190,7 +192,18 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
val batchId = batchSession.batchJobSubmissionOp.batchId
try {
val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
-
Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession)))
+ Utils.tryLogNonFatalError {
+ sessionManager.openBatchSession(batchSession)
+ if (recoveryWaitEngineSubmission) {
+ info(s"Waiting for batch[$batchId] engine submission during
recovery")
+ val batchOp = batchSession.batchJobSubmissionOp
+ while
(batchSession.getSessionEvent.forall(_.exception.isEmpty) &&
+ !batchOp.appStarted &&
+ !OperationState.isTerminal(batchOp.getStatus.state)) {
+ Thread.sleep(300)
+ }
+ }
+ })
Some(task -> batchId)
} catch {
case e: Throwable =>