This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ffd8852b6 [KYUUBI #5002] Fail the engine fast when no incoming 
connection in CONNECTION mode
ffd8852b6 is described below

commit ffd8852b608522ec3c5e0458b66a67787f778089
Author: Xieming LI <[email protected]>
AuthorDate: Sun Jul 16 23:00:16 2023 +0800

    [KYUUBI #5002] Fail the engine fast when no incoming connection in 
CONNECTION mode
    
    ### _Why are the changes needed?_
    Please refer to #4997
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    1. connect to KyuubiServer with beeline
    2. Confirm the Application is ACCEPTed in ResourceManager, Restart 
KyuubiServer
    3. Confirmed that Engine was terminated shortly
    ```
    23/06/28 10:44:59 INFO storage.BlockManagerMaster: Removed 1 successfully 
in removeExecutor
    23/06/28 10:45:00 INFO spark.SparkSQLEngine: Current open session is 0
    23/06/28 10:45:00 ERROR spark.SparkSQLEngine: Spark engine has been 
terminated because no incoming connection for more than 60000 ms, deregistering 
from engine discovery space.
    23/06/28 10:45:00 WARN zookeeper.ZookeeperDiscoveryClient: This Kyuubi 
instance lniuhpi1616.nhnjp.ism:46588 is now de-registered from ZooKeeper. The 
server will be shut down after the last client session completes.
    23/06/28 10:45:00 INFO spark.SparkSQLEngine: Service: 
[SparkTBinaryFrontend] is stopping.
    ```
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5002 from risyomei/feature/failfast.
    
    Closes #5002
    
    402d6c01f [Xieming LI] Changed runInNewThread based on comment
    58f11e157 [Xieming LI] Changed runInNewThread to non-blocking
    c6bb02d6a [Xieming LI] Fixed Unit Test
    168d996d0 [Xieming LI] Start countdown after engine is started
    48ee819f2 [Xieming LI] Fixed a typo
    a8d305942 [Xieming LI] Using runInNewThread ported from Spark
    21f0671df [Xieming LI] Updated document
    a7d5d1082 [Xieming LI] Changed the default value to turn off this feature
    437be512d [Xieming LI] Trigger CI to test agagin
    42a847e84 [Xieming LI] Added Configuration for timeout, changed to 
ThreadPoolExecutor
    639bd5239 [Xieming LI] Fail the engine fast when no incoming connection in 
CONNECTION mode
    
    Authored-by: Xieming LI <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 28 ++++++++++++++++++++++
 .../spark/EtcdShareLevelSparkEngineSuite.scala     |  5 ++--
 .../ZookeeperShareLevelSparkEngineSuite.scala      |  2 ++
 .../kyuubi/engine/spark/session/SessionSuite.scala |  4 +++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 10 ++++++++
 .../apache/kyuubi/util/NamedThreadFactory.scala    |  2 +-
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala | 14 +++++++++++
 .../KyuubiOperationPerConnectionSuite.scala        |  1 +
 9 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index ca29592e2..2d2c589ae 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -413,6 +413,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.session.engine.open.retry.wait                | PT10S                 
  | How long to wait before retrying to open the engine after failure.          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.session.engine.share.level                    | USER                  
  | (deprecated) - Using kyuubi.engine.share.level instead                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.session.engine.spark.main.resource            | &lt;undefined&gt;     
  | The package used to create Spark SQL engine remote application. If it is 
undefined, Kyuubi will use the default                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.session.engine.spark.max.initial.wait         | PT1M                  
  | Max wait time for the initial connection to Spark engine. The engine will 
self-terminate no new incoming connection is established within this time. This 
setting only applies at the CONNECTION share level. 0 or negative means not to 
self-terminate.                                                                 
                                                                                
                 [...]
 | kyuubi.session.engine.spark.max.lifetime             | PT0S                  
  | Max lifetime for Spark engine, the engine will self-terminate when it 
reaches the end of life. 0 or negative means not to self-terminate.             
                                                                                
                                                                                
                                                                                
                    [...]
 | kyuubi.session.engine.spark.progress.timeFormat      | yyyy-MM-dd 
HH:mm:ss.SSS | The time format of the progress bar                              
                                                                                
                                                                                
                                                                                
                                                                                
                         [...]
 | kyuubi.session.engine.spark.progress.update.interval | PT1S                  
  | Update period of progress bar.                                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index bdbc7c08f..b94367e9e 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -37,6 +37,7 @@ import org.apache.kyuubi.Utils._
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf._
 import 
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
+import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
SparkEventHandlerRegister}
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
@@ -80,6 +81,12 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
       assert(currentEngine.isDefined)
       currentEngine.get.stop()
     })
+
+    val maxInitTimeout = conf.get(ENGINE_SPARK_MAX_INITIAL_WAIT)
+    if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString &&
+      maxInitTimeout > 0) {
+      startFastFailChecker(maxInitTimeout)
+    }
   }
 
   override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
@@ -114,6 +121,27 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
     stopEngineExec.get.execute(stopTask)
   }
 
+  private[kyuubi] def startFastFailChecker(maxTimeout: Long): Unit = {
+    val startedTime = System.currentTimeMillis()
+    Utils.tryLogNonFatalError {
+      ThreadUtils.runInNewThread("spark-engine-failfast-checker") {
+        if (!shutdown.get) {
+          while (backendService.sessionManager.getOpenSessionCount <= 0 &&
+            System.currentTimeMillis() - startedTime < maxTimeout) {
+            info(s"Waiting for the initial connection")
+            Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis)
+          }
+          if (backendService.sessionManager.getOpenSessionCount <= 0) {
+            error(s"Spark engine has been terminated because no incoming 
connection" +
+              s" for more than $maxTimeout ms, de-registering from engine 
discovery space.")
+            assert(currentEngine.isDefined)
+            currentEngine.get.stop()
+          }
+        }
+      }
+    }
+  }
+
   override protected def stopServer(): Unit = {
     countDownLatch.countDown()
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
index 46dc3b54c..727b232e3 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
@@ -17,9 +17,7 @@
 
 package org.apache.kyuubi.engine.spark
 
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, 
ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel.ShareLevel
 
@@ -30,6 +28,7 @@ trait EtcdShareLevelSparkEngineSuite
       etcdConf ++ Map(
         ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
         ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+        ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
         ENGINE_CHECK_INTERVAL.key -> "PT5s")
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
index 4ef96e61a..f24abb36c 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
 
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel.ShareLevel
@@ -30,6 +31,7 @@ trait ZookeeperShareLevelSparkEngineSuite
       zookeeperConf ++ Map(
         ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
         ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+        ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
         ENGINE_CHECK_INTERVAL.key -> "PT5s")
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
index 5e0b6c28e..b89c560b3 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
@@ -27,7 +27,9 @@ import org.apache.kyuubi.service.ServiceState._
 
 class SessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
   override def withKyuubiConf: Map[String, String] = {
-    Map(ENGINE_SHARE_LEVEL.key -> "CONNECTION")
+    Map(
+      ENGINE_SHARE_LEVEL.key -> "CONNECTION",
+      ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0")
   }
 
   override protected def beforeEach(): Unit = {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 669f72da0..b8a75d27f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1282,6 +1282,16 @@ object KyuubiConf {
       .timeConf
       .createWithDefault(0)
 
+  val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
+    buildConf("kyuubi.session.engine.spark.max.initial.wait")
+      .doc("Max wait time for the initial connection to Spark engine. The 
engine will" +
+        " self-terminate no new incoming connection is established within this 
time." +
+        " This setting only applies at the CONNECTION share level." +
+        " 0 or negative means not to self-terminate.")
+      .version("1.8.0")
+      .timeConf
+      .createWithDefault(Duration.ofSeconds(60).toMillis)
+
   val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
     buildConf("kyuubi.session.engine.flink.main.resource")
       .doc("The package used to create Flink SQL engine remote job. If it is 
undefined," +
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
index 13127b59b..3ce421e23 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
@@ -32,5 +32,5 @@ class NamedThreadFactory(name: String, daemon: Boolean) 
extends ThreadFactory {
 }
 
 object NamedThreadFactory {
-  private val kyuubiUncaughtExceptionHandler = new 
KyuubiUncaughtExceptionHandler
+  private[util] val kyuubiUncaughtExceptionHandler = new 
KyuubiUncaughtExceptionHandler
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index 8ce4bb2e5..76d3f416f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -95,4 +95,18 @@ object ThreadUtils extends Logging {
       }
     }
   }
+
+  def runInNewThread(
+      threadName: String,
+      isDaemon: Boolean = true)(body: => Unit): Unit = {
+
+    val thread = new Thread(threadName) {
+      override def run(): Unit = {
+        body
+      }
+    }
+    thread.setDaemon(isDaemon)
+    
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
+    thread.start()
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 0c180db72..97ab21998 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -48,6 +48,7 @@ class KyuubiOperationPerConnectionSuite extends 
WithKyuubiServer with HiveJDBCTe
   override protected val conf: KyuubiConf = {
     KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
       .set(SESSION_CONF_ADVISOR.key, classOf[TestSessionConfAdvisor].getName)
+      .set(KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT.key, "0")
   }
 
   test("KYUUBI #647 - async query causes engine crash") {

Reply via email to