This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3aba64d [KYUUBI #1785] Stop Discovery services properly
3aba64d is described below
commit 3aba64d9bb2657893c7cecbb83f6b1816e6768bd
Author: Kent Yao <[email protected]>
AuthorDate: Tue Jan 18 10:41:57 2022 +0800
[KYUUBI #1785] Stop Discovery services properly
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Fix some issues:
1. EngineServiceDiscovery calls deregister twice
2. When LOST zk-server, the delay for gracefully stop shall be calculated
via retry policy not retrywait * maxWaitMs
- engine default using n time retry policy, we can avoid unnecessary
cache for engines when the whole test file finished and server stopped
- otherwise, we are very likey to hit the GA memory limits
```
KyuubiOperationPerGroupSuite:
./build/mvn: line 99: 1725 Killed ${MVN_BIN}
$MAVEN_CLI_OPTS "$"
Error: Process completed with exit code 137.
```
3. When LOST, we shall skip connection retries, closing znode &
closingclient
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1785 from yaooqinn/stop.
Closes #1785
afdd4b47 [Kent Yao] Stop Discovery services properly
5efa493a [Kent Yao] Stop Discovery services properly
45f9312e [Kent Yao] Stop Discovery services properly
d899012e [Kent Yao] Stop Discovery services properly
925ff3e9 [Kent Yao] Stop Discovery services properly
1f766116 [Kent Yao] Stop Discovery services properly
0d25f6a1 [Kent Yao] Stop Discovery services properly
ff833751 [Kent Yao] Stop Discovery services properly
55d64c02 [Kent Yao] Stop Discovery services properly
Authored-by: Kent Yao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../org/apache/kyuubi/config/KyuubiConf.scala | 2 +-
.../kyuubi/ha/client/EngineServiceDiscovery.scala | 30 ++++----
.../kyuubi/ha/client/KyuubiServiceDiscovery.scala | 13 +++-
.../apache/kyuubi/ha/client/ServiceDiscovery.scala | 14 ++--
.../kyuubi/ha/client/ZooKeeperClientProvider.scala | 23 +++++++
.../client/zookeeper/ServiceDiscoveryClient.scala | 25 +++----
.../kyuubi/ha/client/ServiceDiscoverySuite.scala | 79 ++++++++++++++++++----
.../ha/client/ZooKeeperClientProviderSuite.scala | 51 ++++++++++++++
.../scala/org/apache/kyuubi/WithKyuubiServer.scala | 4 +-
9 files changed, 191 insertions(+), 50 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 8597a52..9cf83e7 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -649,7 +649,7 @@ object KyuubiConf {
.doc("The check interval for engine timeout")
.version("1.0.0")
.timeConf
- .checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
+ .checkValue(_ >= Duration.ofSeconds(1).toMillis, "Minimum 1 seconds")
.createWithDefault(Duration.ofMinutes(1).toMillis)
val ENGINE_IDLE_TIMEOUT: ConfigEntry[Long] =
buildConf("session.engine.idle.timeout")
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
index b9d5eba..98127ed 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -31,21 +31,27 @@ class EngineServiceDiscovery(
fe: FrontendService) extends ServiceDiscovery("EngineServiceDiscovery",
fe) {
override def stop(): Unit = synchronized {
- discoveryClient.deregisterService()
- conf.get(ENGINE_SHARE_LEVEL) match {
- // For connection level, we should clean up the namespace in zk in case
the disk stress.
- case "CONNECTION" =>
- try {
- if (discoveryClient.postDeregisterService) {
- info("Clean up discovery service due to this is connection share
level.")
+ if (!isServerLost.get()) {
+ discoveryClient.deregisterService()
+ conf.get(ENGINE_SHARE_LEVEL) match {
+ // For connection level, we should clean up the namespace in zk in
case the disk stress.
+ case "CONNECTION" =>
+ try {
+ if (discoveryClient.postDeregisterService) {
+ info("Clean up discovery service due to this is connection share
level.")
+ }
+ } catch {
+ case NonFatal(e) =>
+ warn("Failed to clean up Spark engine before stop.", e)
}
- } catch {
- case NonFatal(e) =>
- warn("Failed to clean up Spark engine before stop.", e)
- }
- case _ =>
+ case _ =>
+ }
+ discoveryClient.closeClient()
+ } else {
+ warn(s"The Zookeeper ensemble is LOST")
}
super.stop()
}
+
}
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
index 6de2493..c2727a4 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
@@ -26,4 +26,15 @@ import org.apache.kyuubi.service.FrontendService
* @param fe the frontend service to publish for service discovery
*/
class KyuubiServiceDiscovery(
- fe: FrontendService) extends ServiceDiscovery("KyuubiServiceDiscovery", fe)
+ fe: FrontendService) extends ServiceDiscovery("KyuubiServiceDiscovery",
fe) {
+
+ override def stop(): Unit = synchronized {
+ if (!isServerLost.get()) {
+ discoveryClient.deregisterService()
+ discoveryClient.closeClient()
+ } else {
+ warn(s"The Zookeeper ensemble is LOST")
+ }
+ super.stop()
+ }
+}
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 0bb5aa1..1723561 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.ha.client
import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
@@ -42,6 +43,8 @@ abstract class ServiceDiscovery(
name: String,
val fe: FrontendService) extends AbstractService(name) {
+ protected val isServerLost = new AtomicBoolean(false)
+
private var _discoveryClient: ServiceDiscoveryClient = _
def discoveryClient: ServiceDiscoveryClient = _discoveryClient
@@ -60,20 +63,15 @@ abstract class ServiceDiscovery(
super.start()
}
- override def stop(): Unit = {
- discoveryClient.closeClient()
- super.stop()
- }
-
// stop the server genteelly
- def stopGracefully(): Unit = {
- stop()
+ def stopGracefully(isLost: Boolean = false): Unit = {
while (fe.be != null && fe.be.sessionManager.getOpenSessionCount > 0) {
+ debug(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are
active, delay shutdown")
Thread.sleep(1000 * 60)
}
+ isServerLost.set(isLost)
fe.serverable.stop()
}
-
}
object ServiceDiscovery extends Logging {
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
index 24f372c..1c6d185 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.ha.client
import java.io.{File, IOException}
import javax.security.auth.login.Configuration
+import scala.util.Random
+
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry._
import org.apache.hadoop.security.UserGroupInformation
@@ -71,6 +73,27 @@ object ZooKeeperClientProvider extends Logging {
builder.build()
}
+ def getGracefulStopThreadDelay(conf: KyuubiConf): Long = {
+ val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
+ val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
+ val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
+ val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)
+ RetryPolicies.withName(retryPolicyName) match {
+ case ONE_TIME => baseSleepTime
+ case N_TIME => maxRetries * baseSleepTime
+ case BOUNDED_EXPONENTIAL_BACKOFF =>
+ (0 until maxRetries).map { retryCount =>
+ val retryWait = baseSleepTime * Math.max(1, Random.nextInt(1 <<
(retryCount + 1)))
+ Math.min(retryWait, maxSleepTime)
+ }.sum
+ case UNTIL_ELAPSED => maxSleepTime
+ case EXPONENTIAL_BACKOFF =>
+ (0 until maxRetries).map { retryCount =>
+ baseSleepTime * Math.max(1, Random.nextInt(1 << (retryCount + 1)))
+ }.sum
+ }
+ }
+
/**
* Creates a zookeeper client before calling `f` and close it after calling
`f`.
*/
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
index f1bf4f1..98efe23 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
@@ -41,14 +41,12 @@ import org.apache.kyuubi.KYUUBI_VERSION
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
import org.apache.kyuubi.ha.client.ServiceDiscovery
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.buildZookeeperClient
+import
org.apache.kyuubi.ha.client.ZooKeeperClientProvider.{buildZookeeperClient,
getGracefulStopThreadDelay}
import
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
import
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -69,8 +67,6 @@ class ServiceDiscoveryClient(serviceDiscovery:
ServiceDiscovery) extends Logging
def createClient(conf: KyuubiConf): Unit = {
_namespace = conf.get(HA_ZK_NAMESPACE)
- val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
- val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
zkClient = buildZookeeperClient(conf)
zkClient.getConnectionStateListenable.addListener(new
ConnectionStateListener {
private val isConnected = new AtomicBoolean(false)
@@ -81,13 +77,13 @@ class ServiceDiscoveryClient(serviceDiscovery:
ServiceDiscovery) extends Logging
case CONNECTED | RECONNECTED => isConnected.set(true)
case LOST =>
isConnected.set(false)
- val delay = maxRetries.toLong * maxSleepTime
+ val delay = getGracefulStopThreadDelay(conf)
connectionChecker.schedule(
new Runnable {
override def run(): Unit = if (!isConnected.get()) {
error(s"Zookeeper client connection state changed to:
$newState, but failed to" +
- s" reconnect in ${delay / 1000} seconds. Give up retry. ")
- serviceDiscovery.stopGracefully()
+ s" reconnect in ${delay / 1000} seconds. Give up retry and
stop gracefully . ")
+ serviceDiscovery.stopGracefully(true)
}
},
delay,
@@ -120,7 +116,7 @@ class ServiceDiscoveryClient(serviceDiscovery:
ServiceDiscovery) extends Logging
try {
serviceNode.close()
} catch {
- case e: IOException =>
+ case e @ (_: IOException | _: KeeperException) =>
error("Failed to close the persistent ephemeral znode" +
serviceNode.getActualPath, e)
} finally {
serviceNode = null
@@ -130,15 +126,20 @@ class ServiceDiscoveryClient(serviceDiscovery:
ServiceDiscovery) extends Logging
def postDeregisterService(): Boolean = {
if (namespace != null) {
- zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
- true
+ try {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
+ true
+ } catch {
+ case e: KeeperException =>
+ warn(s"Failed to delete $namespace", e)
+ false
+ }
} else {
false
}
}
def closeClient(): Unit = {
- deregisterService()
if (zkClient != null) zkClient.close()
}
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
index c1240fd..4be1de5 100644
---
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.ha.client
import java.io.{File, IOException}
import java.net.InetAddress
import java.util
+import java.util.concurrent.atomic.AtomicBoolean
import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
@@ -32,7 +33,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.service.{NoopTBinaryFrontendServer, Serverable,
ServiceState}
+import org.apache.kyuubi.service._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
class ServiceDiscoverySuite extends KerberizedTestHelper {
@@ -66,17 +67,21 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
.set(HA_ZK_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- val server: Serverable = new NoopTBinaryFrontendServer()
+ var serviceDiscovery: KyuubiServiceDiscovery = null
+ val server: Serverable = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ serviceDiscovery = new KyuubiServiceDiscovery(this)
+ Some(serviceDiscovery)
+ }
+ })
+ }
server.initialize(conf)
server.start()
-
val znodeRoot = s"/$namespace"
- val serviceDiscovery = new
KyuubiServiceDiscovery(server.frontendServices.head)
withZkClient(conf) { framework =>
try {
- serviceDiscovery.initialize(conf)
- serviceDiscovery.start()
-
assert(framework.checkExists().forPath("/abc") === null)
assert(framework.checkExists().forPath(znodeRoot) !== null)
val children = framework.getChildren.forPath(znodeRoot).asScala
@@ -87,13 +92,12 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
children.foreach { child =>
framework.delete().forPath(s"""$znodeRoot/$child""")
}
- eventually(timeout(5.seconds), interval(1.second)) {
+ eventually(timeout(5.seconds), interval(100.millis)) {
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
assert(server.getServiceState === ServiceState.STOPPED)
}
} finally {
server.stop()
- serviceDiscovery.stop()
}
}
}
@@ -165,16 +169,22 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
- val server: Serverable = new NoopTBinaryFrontendServer()
+ var serviceDiscovery: KyuubiServiceDiscovery = null
+ val server: Serverable = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ serviceDiscovery = new KyuubiServiceDiscovery(this)
+ Some(serviceDiscovery)
+ }
+ })
+ }
server.initialize(conf)
server.start()
val znodeRoot = s"/$namespace"
- val serviceDiscovery = new
EngineServiceDiscovery(server.frontendServices.head)
withZkClient(conf) { framework =>
try {
- serviceDiscovery.initialize(conf)
- serviceDiscovery.start()
assert(framework.checkExists().forPath("/abc") === null)
assert(framework.checkExists().forPath(znodeRoot) !== null)
@@ -186,7 +196,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
children.foreach { child =>
framework.delete().forPath(s"""$znodeRoot/$child""")
}
- eventually(timeout(5.seconds), interval(1.second)) {
+ eventually(timeout(5.seconds), interval(100.millis)) {
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
assert(server.getServiceState === ServiceState.STOPPED)
val msg = s"This Kyuubi instance
${server.frontendServices.head.connectionUrl}" +
@@ -217,4 +227,45 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
assert(host === host2)
assert(port === port2)
}
+
+ test("stop engine in time while zk ensemble terminates") {
+ val zkServer = new EmbeddedZookeeper()
+ val conf = KyuubiConf()
+ .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
+ try {
+ zkServer.initialize(conf)
+ zkServer.start()
+ var serviceDiscovery: EngineServiceDiscovery = null
+ val server = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ serviceDiscovery = new EngineServiceDiscovery(this)
+ Some(serviceDiscovery)
+ }
+ })
+ }
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+ .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
+ .set(HA_ZK_QUORUM, zkServer.getConnectString)
+ .set(HA_ZK_SESSION_TIMEOUT, 2000)
+ .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ server.initialize(conf)
+ server.start()
+ assert(server.getServiceState === ServiceState.STARTED)
+
+ zkServer.stop()
+ val isServerLostM =
serviceDiscovery.getClass.getSuperclass.getDeclaredField("isServerLost")
+ isServerLostM.setAccessible(true)
+ val isServerLost = isServerLostM.get(serviceDiscovery)
+
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(isServerLost.asInstanceOf[AtomicBoolean].get())
+ assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
+ assert(server.getServiceState === ServiceState.STOPPED)
+ }
+ } finally {
+ zkServer.stop()
+ }
+ }
}
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
new file mode 100644
index 0000000..97cbdf8
--- /dev/null
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_CONN_BASE_RETRY_WAIT,
HA_ZK_CONN_MAX_RETRIES, HA_ZK_CONN_MAX_RETRY_WAIT, HA_ZK_CONN_RETRY_POLICY}
+
+class ZooKeeperClientProviderSuite extends KyuubiFunSuite {
+
+ test("get graceful stop thread start delay") {
+ val conf = KyuubiConf()
+
+ val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
+ val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
+ val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
+ val delay1 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ assert(delay1 >= baseSleepTime * maxRetries)
+
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+ val delay2 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ assert(delay2 === baseSleepTime)
+
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "N_TIME")
+ val delay3 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ assert(delay3 === baseSleepTime * maxRetries)
+
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "UNTIL_ELAPSED")
+ val delay4 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ assert(delay4 === maxSleepTime)
+
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "BOUNDED_EXPONENTIAL_BACKOFF")
+ val delay5 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ assert(delay5 >= baseSleepTime * maxRetries)
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
index 0d12d45..38171ce 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
@@ -52,8 +52,8 @@ trait WithKyuubiServer extends KyuubiFunSuite {
conf.set("spark.ui.enabled", "false")
conf.setIfMissing("spark.sql.catalogImplementation", "in-memory")
- conf.setIfMissing(ENGINE_CHECK_INTERVAL, 3000L)
- conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 10000L)
+ conf.setIfMissing(ENGINE_CHECK_INTERVAL, 1000L)
+ conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 3000L)
// TODO KYUUBI #745
conf.setIfMissing(ENGINE_INIT_TIMEOUT, 300000L)
server = KyuubiServer.startServer(conf)