This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4148e5b  [KYUUBI #1960] Skip creating sparksession and starting engine 
if the max initialization time exceeds
4148e5b is described below

commit 4148e5bd8d0b0307fcf70e10e7039bb9b60b1704
Author: wForget <[email protected]>
AuthorDate: Tue Mar 1 20:21:31 2022 +0800

    [KYUUBI #1960] Skip creating sparksession and starting engine if the max 
initialization time exceeds
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    close #1960
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [X] Add screenshots for manual tests if appropriate
    
    Test steps:
    1. Start another application to fill the queue.
    2. Modify the `stop-application.sh` so it doesn't work.
    3. Submit a Kyuubi request.
    4. After waiting for timeout, stop the application in step 1.
    
    We can get the following results:
    1. The kyuubi request exits after timeout.
    2. After obtaining resources, the orphaned engine app fails to execute and 
exits immediately.
    
![image](https://user-images.githubusercontent.com/17894939/155467408-f9c5d9d3-ccb9-47a6-b697-1d64f42e427e.png)
    
![image](https://user-images.githubusercontent.com/17894939/155467301-7ee01616-10dd-4c25-9375-02c0ae88091e.png)
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1966 from wForget/KYUUBI-1960.
    
    Closes #1960
    
    0f977c85 [Wang Zhen] spotless
    9bb2178c [Wang Zhen] Merge branch 'master' of 
https://github.com/apache/incubator-kyuubi into KYUUBI-1960
    3effa4d1 [wForget] fix
    f9f13c6f [wForget] fix test
    abdc1f30 [wForget] fix
    48b4bac3 [wForget] [KYUUBI-1960] Skip creating sparksession and starting 
engine if the max initialization time exceeds
    
    Lead-authored-by: wForget <[email protected]>
    Co-authored-by: Wang Zhen <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 69 ++++++++++++++--------
 .../kyuubi/engine/spark/WithSparkSQLEngine.scala   |  1 +
 .../apache/kyuubi/config/KyuubiReservedKeys.scala  |  1 +
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  4 +-
 4 files changed, 50 insertions(+), 25 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index b798cf9..c8839d8 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -31,6 +31,7 @@ import org.apache.kyuubi.{KyuubiException, Logging, Utils}
 import org.apache.kyuubi.Utils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import 
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
EventLoggingService}
 import org.apache.kyuubi.events.EventLogging
@@ -70,6 +71,8 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
 
 object SparkSQLEngine extends Logging {
 
+  val sparkConf: SparkConf = new SparkConf()
+
   val kyuubiConf: KyuubiConf = KyuubiConf()
 
   var currentEngine: Option[SparkSQLEngine] = None
@@ -78,8 +81,7 @@ object SparkSQLEngine extends Logging {
 
   private val countDownLatch = new CountDownLatch(1)
 
-  def createSpark(): SparkSession = {
-    val sparkConf = new SparkConf()
+  def setupConf(): Unit = {
     val rootDir = 
sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(sparkConf))
     val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
     sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold", 
"10000")
@@ -112,11 +114,16 @@ object SparkSQLEngine extends Logging {
         debug(s"KyuubiConf: $k = $v")
       }
     }
+  }
 
+  def createSpark(): SparkSession = {
     val session = SparkSession.builder.config(sparkConf).getOrCreate
     (kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ 
kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
       .foreach { sqlStr =>
-        session.sparkContext.setJobGroup(appName, sqlStr, interruptOnCancel = 
true)
+        session.sparkContext.setJobGroup(
+          "engine_initializing_queries",
+          sqlStr,
+          interruptOnCancel = true)
         debug(s"Execute session initializing sql: $sqlStr")
         session.sql(sqlStr).isEmpty
       }
@@ -168,29 +175,43 @@ object SparkSQLEngine extends Logging {
 
   def main(args: Array[String]): Unit = {
     SignalRegister.registerLogger(logger)
-    var spark: SparkSession = null
-    try {
-      spark = createSpark()
+    setupConf()
+    val startedTime = System.currentTimeMillis()
+    val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match 
{
+      case Some(t) => t.toLong
+      case _ => startedTime
+    }
+    val initTimeout = kyuubiConf.get(ENGINE_INIT_TIMEOUT)
+    val totalInitTime = startedTime - submitTime
+    if (totalInitTime > initTimeout) {
+      throw new KyuubiException(s"The total engine initialization time 
($totalInitTime ms)" +
+        s" exceeds `kyuubi.session.engine.initialize.timeout` ($initTimeout 
ms)," +
+        s" and submitted at $submitTime.")
+    } else {
+      var spark: SparkSession = null
       try {
-        startEngine(spark)
-        // blocking main thread
-        countDownLatch.await()
+        spark = createSpark()
+        try {
+          startEngine(spark)
+          // blocking main thread
+          countDownLatch.await()
+        } catch {
+          case e: KyuubiException => currentEngine match {
+              case Some(engine) =>
+                engine.stop()
+                val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
+                EventLogging.onEvent(event)
+                error(event, e)
+              case _ => error("Current SparkSQLEngine is not created.")
+            }
+
+        }
       } catch {
-        case e: KyuubiException => currentEngine match {
-            case Some(engine) =>
-              engine.stop()
-              val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
-              EventLogging.onEvent(event)
-              error(event, e)
-            case _ => error("Current SparkSQLEngine is not created.")
-          }
-
-      }
-    } catch {
-      case t: Throwable => error(s"Failed to instantiate SparkSession: 
${t.getMessage}", t)
-    } finally {
-      if (spark != null) {
-        spark.stop()
+        case t: Throwable => error(s"Failed to instantiate SparkSession: 
${t.getMessage}", t)
+      } finally {
+        if (spark != null) {
+          spark.stop()
+        }
       }
     }
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
index 4f428ef..e292e71 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
@@ -66,6 +66,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
 
     SparkSession.clearActiveSession()
     SparkSession.clearDefaultSession()
+    SparkSQLEngine.setupConf()
     spark = SparkSQLEngine.createSpark()
     SparkSQLEngine.startEngine(spark)
     engine = SparkSQLEngine.currentEngine.get
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index e4e280a..1a30078 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -20,4 +20,5 @@ package org.apache.kyuubi.config
 object KyuubiReservedKeys {
   final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
   final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
+  final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index a826a36..21a8a89 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import 
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
 import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, SPARK_SQL, 
TRINO}
 import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, 
ShareLevel}
 import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
@@ -178,6 +179,8 @@ private[kyuubi] class EngineRef(
 
     conf.set(HA_ZK_NAMESPACE, engineSpace)
     conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
+    val started = System.currentTimeMillis()
+    conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
     val builder = engineType match {
       case SPARK_SQL =>
         conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
@@ -203,7 +206,6 @@ private[kyuubi] class EngineRef(
     try {
       info(s"Launching engine:\n$builder")
       val process = builder.start
-      val started = System.currentTimeMillis()
       var exitValue: Option[Int] = None
       while (engineRef.isEmpty) {
         if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {

Reply via email to