This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4148e5b [KYUUBI #1960] Skip creating sparksession and starting engine
if the max initialization time exceeds
4148e5b is described below
commit 4148e5bd8d0b0307fcf70e10e7039bb9b60b1704
Author: wForget <[email protected]>
AuthorDate: Tue Mar 1 20:21:31 2022 +0800
[KYUUBI #1960] Skip creating sparksession and starting engine if the max
initialization time exceeds
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
close #1960
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [X] Add screenshots for manual tests if appropriate
Test steps:
1. Start another application to fill the queue.
2. Modify the `stop-application.sh` so it doesn't work.
3. Submit a Kyuubi request.
4. After waiting for timeout, stop the application in step 1.
We can get the following results:
1. The kyuubi request exits after timeout.
2. After obtaining resources, the orphaned engine app fails to execute and
exits immediately.


- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1966 from wForget/KYUUBI-1960.
Closes #1960
0f977c85 [Wang Zhen] spotless
9bb2178c [Wang Zhen] Merge branch 'master' of
https://github.com/apache/incubator-kyuubi into KYUUBI-1960
3effa4d1 [wForget] fix
f9f13c6f [wForget] fix test
abdc1f30 [wForget] fix
48b4bac3 [wForget] [KYUUBI-1960] Skip creating sparksession and starting
engine if the max initialization time exceeds
Lead-authored-by: wForget <[email protected]>
Co-authored-by: Wang Zhen <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 69 ++++++++++++++--------
.../kyuubi/engine/spark/WithSparkSQLEngine.scala | 1 +
.../apache/kyuubi/config/KyuubiReservedKeys.scala | 1 +
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 4 +-
4 files changed, 50 insertions(+), 25 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index b798cf9..c8839d8 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -31,6 +31,7 @@ import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
EventLoggingService}
import org.apache.kyuubi.events.EventLogging
@@ -70,6 +71,8 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
object SparkSQLEngine extends Logging {
+ val sparkConf: SparkConf = new SparkConf()
+
val kyuubiConf: KyuubiConf = KyuubiConf()
var currentEngine: Option[SparkSQLEngine] = None
@@ -78,8 +81,7 @@ object SparkSQLEngine extends Logging {
private val countDownLatch = new CountDownLatch(1)
- def createSpark(): SparkSession = {
- val sparkConf = new SparkConf()
+ def setupConf(): Unit = {
val rootDir =
sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(sparkConf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold",
"10000")
@@ -112,11 +114,16 @@ object SparkSQLEngine extends Logging {
debug(s"KyuubiConf: $k = $v")
}
}
+ }
+ def createSpark(): SparkSession = {
val session = SparkSession.builder.config(sparkConf).getOrCreate
(kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++
kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
.foreach { sqlStr =>
- session.sparkContext.setJobGroup(appName, sqlStr, interruptOnCancel =
true)
+ session.sparkContext.setJobGroup(
+ "engine_initializing_queries",
+ sqlStr,
+ interruptOnCancel = true)
debug(s"Execute session initializing sql: $sqlStr")
session.sql(sqlStr).isEmpty
}
@@ -168,29 +175,43 @@ object SparkSQLEngine extends Logging {
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
- var spark: SparkSession = null
- try {
- spark = createSpark()
+ setupConf()
+ val startedTime = System.currentTimeMillis()
+ val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match
{
+ case Some(t) => t.toLong
+ case _ => startedTime
+ }
+ val initTimeout = kyuubiConf.get(ENGINE_INIT_TIMEOUT)
+ val totalInitTime = startedTime - submitTime
+ if (totalInitTime > initTimeout) {
+ throw new KyuubiException(s"The total engine initialization time
($totalInitTime ms)" +
+ s" exceeds `kyuubi.session.engine.initialize.timeout` ($initTimeout
ms)," +
+ s" and submitted at $submitTime.")
+ } else {
+ var spark: SparkSession = null
try {
- startEngine(spark)
- // blocking main thread
- countDownLatch.await()
+ spark = createSpark()
+ try {
+ startEngine(spark)
+ // blocking main thread
+ countDownLatch.await()
+ } catch {
+ case e: KyuubiException => currentEngine match {
+ case Some(engine) =>
+ engine.stop()
+ val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
+ EventLogging.onEvent(event)
+ error(event, e)
+ case _ => error("Current SparkSQLEngine is not created.")
+ }
+
+ }
} catch {
- case e: KyuubiException => currentEngine match {
- case Some(engine) =>
- engine.stop()
- val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
- EventLogging.onEvent(event)
- error(event, e)
- case _ => error("Current SparkSQLEngine is not created.")
- }
-
- }
- } catch {
- case t: Throwable => error(s"Failed to instantiate SparkSession:
${t.getMessage}", t)
- } finally {
- if (spark != null) {
- spark.stop()
+ case t: Throwable => error(s"Failed to instantiate SparkSession:
${t.getMessage}", t)
+ } finally {
+ if (spark != null) {
+ spark.stop()
+ }
}
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
index 4f428ef..e292e71 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
@@ -66,6 +66,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
+ SparkSQLEngine.setupConf()
spark = SparkSQLEngine.createSpark()
SparkSQLEngine.startEngine(spark)
engine = SparkSQLEngine.currentEngine.get
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index e4e280a..1a30078 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -20,4 +20,5 @@ package org.apache.kyuubi.config
object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
+ final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index a826a36..21a8a89 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, SPARK_SQL,
TRINO}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER,
ShareLevel}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
@@ -178,6 +179,8 @@ private[kyuubi] class EngineRef(
conf.set(HA_ZK_NAMESPACE, engineSpace)
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
+ val started = System.currentTimeMillis()
+ conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
val builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
@@ -203,7 +206,6 @@ private[kyuubi] class EngineRef(
try {
info(s"Launching engine:\n$builder")
val process = builder.start
- val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {