This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ffd8852b6 [KYUUBI #5002] Fail the engine fast when no incoming
connection in CONNECTION mode
ffd8852b6 is described below
commit ffd8852b608522ec3c5e0458b66a67787f778089
Author: Xieming LI <[email protected]>
AuthorDate: Sun Jul 16 23:00:16 2023 +0800
[KYUUBI #5002] Fail the engine fast when no incoming connection in
CONNECTION mode
### _Why are the changes needed?_
Please refer to #4997
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
1. connect to KyuubiServer with beeline
2. Confirm the Application is ACCEPTed in ResourceManager, Restart
KyuubiServer
3. Confirmed that Engine was terminated shortly
```
23/06/28 10:44:59 INFO storage.BlockManagerMaster: Removed 1 successfully
in removeExecutor
23/06/28 10:45:00 INFO spark.SparkSQLEngine: Current open session is 0
23/06/28 10:45:00 ERROR spark.SparkSQLEngine: Spark engine has been
terminated because no incoming connection for more than 60000 ms, deregistering
from engine discovery space.
23/06/28 10:45:00 WARN zookeeper.ZookeeperDiscoveryClient: This Kyuubi
instance lniuhpi1616.nhnjp.ism:46588 is now de-registered from ZooKeeper. The
server will be shut down after the last client session completes.
23/06/28 10:45:00 INFO spark.SparkSQLEngine: Service:
[SparkTBinaryFrontend] is stopping.
```
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5002 from risyomei/feature/failfast.
Closes #5002
402d6c01f [Xieming LI] Changed runInNewThread based on comment
58f11e157 [Xieming LI] Changed runInNewThread to non-blocking
c6bb02d6a [Xieming LI] Fixed Unit Test
168d996d0 [Xieming LI] Start countdown after engine is started
48ee819f2 [Xieming LI] Fixed a typo
a8d305942 [Xieming LI] Using runInNewThread ported from Spark
21f0671df [Xieming LI] Updated document
a7d5d1082 [Xieming LI] Changed the default value to turn off this feature
437be512d [Xieming LI] Trigger CI to test agagin
42a847e84 [Xieming LI] Added Configuration for timeout, changed to
ThreadPoolExecutor
639bd5239 [Xieming LI] Fail the engine fast when no incoming connection in
CONNECTION mode
Authored-by: Xieming LI <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 1 +
.../kyuubi/engine/spark/SparkSQLEngine.scala | 28 ++++++++++++++++++++++
.../spark/EtcdShareLevelSparkEngineSuite.scala | 5 ++--
.../ZookeeperShareLevelSparkEngineSuite.scala | 2 ++
.../kyuubi/engine/spark/session/SessionSuite.scala | 4 +++-
.../org/apache/kyuubi/config/KyuubiConf.scala | 10 ++++++++
.../apache/kyuubi/util/NamedThreadFactory.scala | 2 +-
.../scala/org/apache/kyuubi/util/ThreadUtils.scala | 14 +++++++++++
.../KyuubiOperationPerConnectionSuite.scala | 1 +
9 files changed, 62 insertions(+), 5 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index ca29592e2..2d2c589ae 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -413,6 +413,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.session.engine.open.retry.wait | PT10S
| How long to wait before retrying to open the engine after failure.
[...]
| kyuubi.session.engine.share.level | USER
| (deprecated) - Using kyuubi.engine.share.level instead
[...]
| kyuubi.session.engine.spark.main.resource | <undefined>
| The package used to create Spark SQL engine remote application. If it is
undefined, Kyuubi will use the default
[...]
+| kyuubi.session.engine.spark.max.initial.wait | PT1M
| Max wait time for the initial connection to Spark engine. The engine will
self-terminate no new incoming connection is established within this time. This
setting only applies at the CONNECTION share level. 0 or negative means not to
self-terminate.
[...]
| kyuubi.session.engine.spark.max.lifetime | PT0S
| Max lifetime for Spark engine, the engine will self-terminate when it
reaches the end of life. 0 or negative means not to self-terminate.
[...]
| kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd
HH:mm:ss.SSS | The time format of the progress bar
[...]
| kyuubi.session.engine.spark.progress.update.interval | PT1S
| Update period of progress bar.
[...]
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index bdbc7c08f..b94367e9e 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -37,6 +37,7 @@ import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
+import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
@@ -80,6 +81,12 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
+
+ val maxInitTimeout = conf.get(ENGINE_SPARK_MAX_INITIAL_WAIT)
+ if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString &&
+ maxInitTimeout > 0) {
+ startFastFailChecker(maxInitTimeout)
+ }
}
override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
@@ -114,6 +121,27 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
stopEngineExec.get.execute(stopTask)
}
+ private[kyuubi] def startFastFailChecker(maxTimeout: Long): Unit = {
+ val startedTime = System.currentTimeMillis()
+ Utils.tryLogNonFatalError {
+ ThreadUtils.runInNewThread("spark-engine-failfast-checker") {
+ if (!shutdown.get) {
+ while (backendService.sessionManager.getOpenSessionCount <= 0 &&
+ System.currentTimeMillis() - startedTime < maxTimeout) {
+ info(s"Waiting for the initial connection")
+ Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis)
+ }
+ if (backendService.sessionManager.getOpenSessionCount <= 0) {
+ error(s"Spark engine has been terminated because no incoming
connection" +
+ s" for more than $maxTimeout ms, de-registering from engine
discovery space.")
+ assert(currentEngine.isDefined)
+ currentEngine.get.stop()
+ }
+ }
+ }
+ }
+ }
+
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
index 46dc3b54c..727b232e3 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala
@@ -17,9 +17,7 @@
package org.apache.kyuubi.engine.spark
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL,
ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
@@ -30,6 +28,7 @@ trait EtcdShareLevelSparkEngineSuite
etcdConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+ ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
index 4ef96e61a..f24abb36c 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
@@ -30,6 +31,7 @@ trait ZookeeperShareLevelSparkEngineSuite
zookeeperConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
+ ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
index 5e0b6c28e..b89c560b3 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala
@@ -27,7 +27,9 @@ import org.apache.kyuubi.service.ServiceState._
class SessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] = {
- Map(ENGINE_SHARE_LEVEL.key -> "CONNECTION")
+ Map(
+ ENGINE_SHARE_LEVEL.key -> "CONNECTION",
+ ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0")
}
override protected def beforeEach(): Unit = {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 669f72da0..b8a75d27f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1282,6 +1282,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(0)
+ val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
+ buildConf("kyuubi.session.engine.spark.max.initial.wait")
+ .doc("Max wait time for the initial connection to Spark engine. The
engine will" +
+ " self-terminate no new incoming connection is established within this
time." +
+ " This setting only applies at the CONNECTION share level." +
+ " 0 or negative means not to self-terminate.")
+ .version("1.8.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(60).toMillis)
+
val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.engine.flink.main.resource")
.doc("The package used to create Flink SQL engine remote job. If it is
undefined," +
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
index 13127b59b..3ce421e23 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala
@@ -32,5 +32,5 @@ class NamedThreadFactory(name: String, daemon: Boolean)
extends ThreadFactory {
}
object NamedThreadFactory {
- private val kyuubiUncaughtExceptionHandler = new
KyuubiUncaughtExceptionHandler
+ private[util] val kyuubiUncaughtExceptionHandler = new
KyuubiUncaughtExceptionHandler
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index 8ce4bb2e5..76d3f416f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -95,4 +95,18 @@ object ThreadUtils extends Logging {
}
}
}
+
+ def runInNewThread(
+ threadName: String,
+ isDaemon: Boolean = true)(body: => Unit): Unit = {
+
+ val thread = new Thread(threadName) {
+ override def run(): Unit = {
+ body
+ }
+ }
+ thread.setDaemon(isDaemon)
+
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
+ thread.start()
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 0c180db72..97ab21998 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -48,6 +48,7 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
override protected val conf: KyuubiConf = {
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
.set(SESSION_CONF_ADVISOR.key, classOf[TestSessionConfAdvisor].getName)
+ .set(KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT.key, "0")
}
test("KYUUBI #647 - async query causes engine crash") {