This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0417555bb [KYUUBI #5089] Limit maximum engine startup concurrency of 
kyuubi server
0417555bb is described below

commit 0417555bb0812ff3f39d0718a9583c3dfcce4eb5
Author: wforget <[email protected]>
AuthorDate: Mon Jul 31 13:50:34 2023 +0800

    [KYUUBI #5089] Limit maximum engine startup concurrency of kyuubi server
    
    ### _Why are the changes needed?_
    
    As reported by #4825, a large number of engine builder processes may cause 
high machine load on the kyuubi server, So I want to add a config to limit 
engine creation concurrency.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5089 from wForget/engine_builder_limit.
    
    Closes #5089
    
    77507005d [wforget] comment
    774a8599b [wforget] comments
    373640fc0 [wforget] Limit maximum engine creation concurrency of kyuubi 
server
    ecc3b4af6 [mans2singh] [KYUUBI #5086] [KYUUBI # 5085] Update config section 
of deploy on kubernetes
    
    Lead-authored-by: wforget <[email protected]>
    Co-authored-by: mans2singh <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../main/scala/org/apache/kyuubi/config/KyuubiConf.scala   | 12 ++++++++++++
 .../main/scala/org/apache/kyuubi/engine/EngineRef.scala    | 14 ++++++++++++--
 .../org/apache/kyuubi/session/KyuubiSessionImpl.scala      |  3 ++-
 .../org/apache/kyuubi/session/KyuubiSessionManager.scala   | 11 ++++++++++-
 4 files changed, 36 insertions(+), 4 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 65f03c654..44e3fcd17 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2981,4 +2981,16 @@ object KyuubiConf {
       .version("1.8.0")
       .booleanConf
       .createWithDefault(false)
+
+  val SERVER_LIMIT_ENGINE_CREATION: OptionalConfigEntry[Int] =
+    buildConf("kyuubi.server.limit.engine.startup")
+      .internal
+      .doc("The maximum engine startup concurrency of kyuubi server. Highly 
concurrent engine" +
+        " startup processes may lead to high load on the kyuubi server 
machine," +
+        " this configuration is used to limit the number of engine startup 
processes" +
+        " running at the same time to avoid it.")
+      .version("1.8.0")
+      .serverOnly
+      .intConf
+      .createOptional
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 123aec46c..ae40fbd4f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Semaphore, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.util.Random
@@ -57,7 +57,8 @@ private[kyuubi] class EngineRef(
     user: String,
     groupProvider: GroupProvider,
     engineRefId: String,
-    engineManager: KyuubiApplicationManager)
+    engineManager: KyuubiApplicationManager,
+    startupProcessSemaphore: Option[Semaphore] = None)
   extends Logging {
   // The corresponding ServerSpace where the engine belongs to
   private val serverSpace: String = conf.get(HA_NAMESPACE)
@@ -202,7 +203,15 @@ private[kyuubi] class EngineRef(
     }
 
     MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
+    var acquiredPermit = false
     try {
+      if (!startupProcessSemaphore.forall(_.tryAcquire(timeout, 
TimeUnit.MILLISECONDS))) {
+        MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, 
appUser)))
+        throw KyuubiSQLException(
+          s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to 
change it) to" +
+            s" acquires a permit from engine builder semaphore.")
+      }
+      acquiredPermit = true
       val redactedCmd = builder.toString
       info(s"Launching engine:\n$redactedCmd")
       builder.validateConf
@@ -267,6 +276,7 @@ private[kyuubi] class EngineRef(
       }
       engineRef.get
     } finally {
+      if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
       val waitCompletion = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
       val destroyProcess = !waitCompletion && builder.isClusterMode()
       if (destroyProcess) {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 809be86f3..67eb6c86e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -77,7 +77,8 @@ class KyuubiSessionImpl(
     user,
     sessionManager.groupProvider,
     handle.identifier.toString,
-    sessionManager.applicationManager)
+    sessionManager.applicationManager,
+    sessionManager.engineStartupProcessSemaphore)
   private[kyuubi] val launchEngineOp = sessionManager.operationManager
     .newLaunchEngineOperation(this, 
sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 8d63dfbf7..193bfbdfc 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.session
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Semaphore, TimeUnit}
 
 import scala.collection.JavaConverters._
 
@@ -63,6 +63,8 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   private var batchLimiter: Option[SessionLimiter] = None
   lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
 
+  var engineStartupProcessSemaphore: Option[Semaphore] = None
+
   private val engineConnectionAliveChecker =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
 
@@ -72,6 +74,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     addService(credentialsManager)
     metadataManager.foreach(addService)
     initSessionLimiter(conf)
+    initEngineStartupProcessSemaphore(conf)
     super.initialize(conf)
   }
 
@@ -360,4 +363,10 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       TimeUnit.MILLISECONDS)
   }
 
+  private def initEngineStartupProcessSemaphore(conf: KyuubiConf): Unit = {
+    val engineCreationLimit = conf.get(KyuubiConf.SERVER_LIMIT_ENGINE_CREATION)
+    engineCreationLimit.filter(_ > 0).foreach { limit =>
+      engineStartupProcessSemaphore = Some(new Semaphore(limit))
+    }
+  }
 }

Reply via email to