This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 0417555bb [KYUUBI #5089] Limit maximum engine startup concurrency of
kyuubi server
0417555bb is described below
commit 0417555bb0812ff3f39d0718a9583c3dfcce4eb5
Author: wforget <[email protected]>
AuthorDate: Mon Jul 31 13:50:34 2023 +0800
[KYUUBI #5089] Limit maximum engine startup concurrency of kyuubi server
### _Why are the changes needed?_
As reported by #4825, a large number of engine builder processes may cause
high machine load on the kyuubi server, So I want to add a config to limit
engine creation concurrency.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5089 from wForget/engine_builder_limit.
Closes #5089
77507005d [wforget] comment
774a8599b [wforget] comments
373640fc0 [wforget] Limit maximum engine creation concurrency of kyuubi
server
ecc3b4af6 [mans2singh] [KYUUBI #5086] [KYUUBI # 5085] Update config section
of deploy on kubernetes
Lead-authored-by: wforget <[email protected]>
Co-authored-by: mans2singh <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 12 ++++++++++++
.../main/scala/org/apache/kyuubi/engine/EngineRef.scala | 14 ++++++++++++--
.../org/apache/kyuubi/session/KyuubiSessionImpl.scala | 3 ++-
.../org/apache/kyuubi/session/KyuubiSessionManager.scala | 11 ++++++++++-
4 files changed, 36 insertions(+), 4 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 65f03c654..44e3fcd17 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2981,4 +2981,16 @@ object KyuubiConf {
.version("1.8.0")
.booleanConf
.createWithDefault(false)
+
+ val SERVER_LIMIT_ENGINE_CREATION: OptionalConfigEntry[Int] =
+ buildConf("kyuubi.server.limit.engine.startup")
+ .internal
+ .doc("The maximum engine startup concurrency of kyuubi server. Highly
concurrent engine" +
+ " startup processes may lead to high load on the kyuubi server
machine," +
+ " this configuration is used to limit the number of engine startup
processes" +
+ " running at the same time to avoid it.")
+ .version("1.8.0")
+ .serverOnly
+ .intConf
+ .createOptional
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 123aec46c..ae40fbd4f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Semaphore, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Random
@@ -57,7 +57,8 @@ private[kyuubi] class EngineRef(
user: String,
groupProvider: GroupProvider,
engineRefId: String,
- engineManager: KyuubiApplicationManager)
+ engineManager: KyuubiApplicationManager,
+ startupProcessSemaphore: Option[Semaphore] = None)
extends Logging {
// The corresponding ServerSpace where the engine belongs to
private val serverSpace: String = conf.get(HA_NAMESPACE)
@@ -202,7 +203,15 @@ private[kyuubi] class EngineRef(
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
+ var acquiredPermit = false
try {
+ if (!startupProcessSemaphore.forall(_.tryAcquire(timeout,
TimeUnit.MILLISECONDS))) {
+ MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT,
appUser)))
+ throw KyuubiSQLException(
+ s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to
change it) to" +
+ s" acquires a permit from engine builder semaphore.")
+ }
+ acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
builder.validateConf
@@ -267,6 +276,7 @@ private[kyuubi] class EngineRef(
}
engineRef.get
} finally {
+ if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
val waitCompletion =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
val destroyProcess = !waitCompletion && builder.isClusterMode()
if (destroyProcess) {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 809be86f3..67eb6c86e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -77,7 +77,8 @@ class KyuubiSessionImpl(
user,
sessionManager.groupProvider,
handle.identifier.toString,
- sessionManager.applicationManager)
+ sessionManager.applicationManager,
+ sessionManager.engineStartupProcessSemaphore)
private[kyuubi] val launchEngineOp = sessionManager.operationManager
.newLaunchEngineOperation(this,
sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 8d63dfbf7..193bfbdfc 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.session
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Semaphore, TimeUnit}
import scala.collection.JavaConverters._
@@ -63,6 +63,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
private var batchLimiter: Option[SessionLimiter] = None
lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
+ var engineStartupProcessSemaphore: Option[Semaphore] = None
+
private val engineConnectionAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
@@ -72,6 +74,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
addService(credentialsManager)
metadataManager.foreach(addService)
initSessionLimiter(conf)
+ initEngineStartupProcessSemaphore(conf)
super.initialize(conf)
}
@@ -360,4 +363,10 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
TimeUnit.MILLISECONDS)
}
+ private def initEngineStartupProcessSemaphore(conf: KyuubiConf): Unit = {
+ val engineCreationLimit = conf.get(KyuubiConf.SERVER_LIMIT_ENGINE_CREATION)
+ engineCreationLimit.filter(_ > 0).foreach { limit =>
+ engineStartupProcessSemaphore = Some(new Semaphore(limit))
+ }
+ }
}