This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aba64d  [KYUUBI #1785] Stop Discovery services properly
3aba64d is described below

commit 3aba64d9bb2657893c7cecbb83f6b1816e6768bd
Author: Kent Yao <[email protected]>
AuthorDate: Tue Jan 18 10:41:57 2022 +0800

    [KYUUBI #1785] Stop Discovery services properly
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Fix some issues:
    
    1. EngineServiceDiscovery calls deregister twice
    2. When LOST zk-server, the delay for gracefully stop shall be calculated 
via retry policy not retrywait * maxWaitMs
         - engine default using n time retry policy, we can avoid unnecessary 
cache for engines when the whole test file finished and server stopped
         - otherwise, we are very likey to hit the GA memory limits
    ```
         KyuubiOperationPerGroupSuite:
    ./build/mvn: line 99:  1725 Killed                  ${MVN_BIN} 
$MAVEN_CLI_OPTS "$"
    Error: Process completed with exit code 137.
    ```
    3. When LOST, we shall skip connection retries, closing znode & 
closingclient
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1785 from yaooqinn/stop.
    
    Closes #1785
    
    afdd4b47 [Kent Yao] Stop Discovery services properly
    5efa493a [Kent Yao] Stop Discovery services properly
    45f9312e [Kent Yao] Stop Discovery services properly
    d899012e [Kent Yao] Stop Discovery services properly
    925ff3e9 [Kent Yao] Stop Discovery services properly
    1f766116 [Kent Yao] Stop Discovery services properly
    0d25f6a1 [Kent Yao] Stop Discovery services properly
    ff833751 [Kent Yao] Stop Discovery services properly
    55d64c02 [Kent Yao] Stop Discovery services properly
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  2 +-
 .../kyuubi/ha/client/EngineServiceDiscovery.scala  | 30 ++++----
 .../kyuubi/ha/client/KyuubiServiceDiscovery.scala  | 13 +++-
 .../apache/kyuubi/ha/client/ServiceDiscovery.scala | 14 ++--
 .../kyuubi/ha/client/ZooKeeperClientProvider.scala | 23 +++++++
 .../client/zookeeper/ServiceDiscoveryClient.scala  | 25 +++----
 .../kyuubi/ha/client/ServiceDiscoverySuite.scala   | 79 ++++++++++++++++++----
 .../ha/client/ZooKeeperClientProviderSuite.scala   | 51 ++++++++++++++
 .../scala/org/apache/kyuubi/WithKyuubiServer.scala |  4 +-
 9 files changed, 191 insertions(+), 50 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 8597a52..9cf83e7 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -649,7 +649,7 @@ object KyuubiConf {
     .doc("The check interval for engine timeout")
     .version("1.0.0")
     .timeConf
-    .checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
+    .checkValue(_ >= Duration.ofSeconds(1).toMillis, "Minimum 1 seconds")
     .createWithDefault(Duration.ofMinutes(1).toMillis)
 
   val ENGINE_IDLE_TIMEOUT: ConfigEntry[Long] = 
buildConf("session.engine.idle.timeout")
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
index b9d5eba..98127ed 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -31,21 +31,27 @@ class EngineServiceDiscovery(
     fe: FrontendService) extends ServiceDiscovery("EngineServiceDiscovery", 
fe) {
 
   override def stop(): Unit = synchronized {
-    discoveryClient.deregisterService()
-    conf.get(ENGINE_SHARE_LEVEL) match {
-      // For connection level, we should clean up the namespace in zk in case 
the disk stress.
-      case "CONNECTION" =>
-        try {
-          if (discoveryClient.postDeregisterService) {
-            info("Clean up discovery service due to this is connection share 
level.")
+    if (!isServerLost.get()) {
+      discoveryClient.deregisterService()
+      conf.get(ENGINE_SHARE_LEVEL) match {
+        // For connection level, we should clean up the namespace in zk in 
case the disk stress.
+        case "CONNECTION" =>
+          try {
+            if (discoveryClient.postDeregisterService) {
+              info("Clean up discovery service due to this is connection share 
level.")
+            }
+          } catch {
+            case NonFatal(e) =>
+              warn("Failed to clean up Spark engine before stop.", e)
           }
-        } catch {
-          case NonFatal(e) =>
-            warn("Failed to clean up Spark engine before stop.", e)
-        }
 
-      case _ =>
+        case _ =>
+      }
+      discoveryClient.closeClient()
+    } else {
+      warn(s"The Zookeeper ensemble is LOST")
     }
     super.stop()
   }
+
 }
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
index 6de2493..c2727a4 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala
@@ -26,4 +26,15 @@ import org.apache.kyuubi.service.FrontendService
  * @param fe the frontend service to publish for service discovery
  */
 class KyuubiServiceDiscovery(
-    fe: FrontendService) extends ServiceDiscovery("KyuubiServiceDiscovery", fe)
+    fe: FrontendService) extends ServiceDiscovery("KyuubiServiceDiscovery", 
fe) {
+
+  override def stop(): Unit = synchronized {
+    if (!isServerLost.get()) {
+      discoveryClient.deregisterService()
+      discoveryClient.closeClient()
+    } else {
+      warn(s"The Zookeeper ensemble is LOST")
+    }
+    super.stop()
+  }
+}
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 0bb5aa1..1723561 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.ha.client
 
 import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
@@ -42,6 +43,8 @@ abstract class ServiceDiscovery(
     name: String,
     val fe: FrontendService) extends AbstractService(name) {
 
+  protected val isServerLost = new AtomicBoolean(false)
+
   private var _discoveryClient: ServiceDiscoveryClient = _
 
   def discoveryClient: ServiceDiscoveryClient = _discoveryClient
@@ -60,20 +63,15 @@ abstract class ServiceDiscovery(
     super.start()
   }
 
-  override def stop(): Unit = {
-    discoveryClient.closeClient()
-    super.stop()
-  }
-
   // stop the server genteelly
-  def stopGracefully(): Unit = {
-    stop()
+  def stopGracefully(isLost: Boolean = false): Unit = {
     while (fe.be != null && fe.be.sessionManager.getOpenSessionCount > 0) {
+      debug(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are 
active, delay shutdown")
       Thread.sleep(1000 * 60)
     }
+    isServerLost.set(isLost)
     fe.serverable.stop()
   }
-
 }
 
 object ServiceDiscovery extends Logging {
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
index 24f372c..1c6d185 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.ha.client
 import java.io.{File, IOException}
 import javax.security.auth.login.Configuration
 
+import scala.util.Random
+
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry._
 import org.apache.hadoop.security.UserGroupInformation
@@ -71,6 +73,27 @@ object ZooKeeperClientProvider extends Logging {
     builder.build()
   }
 
+  def getGracefulStopThreadDelay(conf: KyuubiConf): Long = {
+    val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
+    val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
+    val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
+    val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)
+    RetryPolicies.withName(retryPolicyName) match {
+      case ONE_TIME => baseSleepTime
+      case N_TIME => maxRetries * baseSleepTime
+      case BOUNDED_EXPONENTIAL_BACKOFF =>
+        (0 until maxRetries).map { retryCount =>
+          val retryWait = baseSleepTime * Math.max(1, Random.nextInt(1 << 
(retryCount + 1)))
+          Math.min(retryWait, maxSleepTime)
+        }.sum
+      case UNTIL_ELAPSED => maxSleepTime
+      case EXPONENTIAL_BACKOFF =>
+        (0 until maxRetries).map { retryCount =>
+          baseSleepTime * Math.max(1, Random.nextInt(1 << (retryCount + 1)))
+        }.sum
+    }
+  }
+
   /**
    * Creates a zookeeper client before calling `f` and close it after calling 
`f`.
    */
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
index f1bf4f1..98efe23 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
@@ -41,14 +41,12 @@ import org.apache.kyuubi.KYUUBI_VERSION
 import org.apache.kyuubi.KyuubiException
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
 import org.apache.kyuubi.ha.client.ServiceDiscovery
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.buildZookeeperClient
+import 
org.apache.kyuubi.ha.client.ZooKeeperClientProvider.{buildZookeeperClient, 
getGracefulStopThreadDelay}
 import 
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
 import 
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
 import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -69,8 +67,6 @@ class ServiceDiscoveryClient(serviceDiscovery: 
ServiceDiscovery) extends Logging
 
   def createClient(conf: KyuubiConf): Unit = {
     _namespace = conf.get(HA_ZK_NAMESPACE)
-    val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
-    val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
     zkClient = buildZookeeperClient(conf)
     zkClient.getConnectionStateListenable.addListener(new 
ConnectionStateListener {
       private val isConnected = new AtomicBoolean(false)
@@ -81,13 +77,13 @@ class ServiceDiscoveryClient(serviceDiscovery: 
ServiceDiscovery) extends Logging
           case CONNECTED | RECONNECTED => isConnected.set(true)
           case LOST =>
             isConnected.set(false)
-            val delay = maxRetries.toLong * maxSleepTime
+            val delay = getGracefulStopThreadDelay(conf)
             connectionChecker.schedule(
               new Runnable {
                 override def run(): Unit = if (!isConnected.get()) {
                   error(s"Zookeeper client connection state changed to: 
$newState, but failed to" +
-                    s" reconnect in ${delay / 1000} seconds. Give up retry. ")
-                  serviceDiscovery.stopGracefully()
+                    s" reconnect in ${delay / 1000} seconds. Give up retry and 
stop gracefully . ")
+                  serviceDiscovery.stopGracefully(true)
                 }
               },
               delay,
@@ -120,7 +116,7 @@ class ServiceDiscoveryClient(serviceDiscovery: 
ServiceDiscovery) extends Logging
       try {
         serviceNode.close()
       } catch {
-        case e: IOException =>
+        case e @ (_: IOException | _: KeeperException) =>
           error("Failed to close the persistent ephemeral znode" + 
serviceNode.getActualPath, e)
       } finally {
         serviceNode = null
@@ -130,15 +126,20 @@ class ServiceDiscoveryClient(serviceDiscovery: 
ServiceDiscovery) extends Logging
 
   def postDeregisterService(): Boolean = {
     if (namespace != null) {
-      zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
-      true
+      try {
+        zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
+        true
+      } catch {
+        case e: KeeperException =>
+          warn(s"Failed to delete $namespace", e)
+          false
+      }
     } else {
       false
     }
   }
 
   def closeClient(): Unit = {
-    deregisterService()
     if (zkClient != null) zkClient.close()
   }
 
diff --git 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
index c1240fd..4be1de5 100644
--- 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
+++ 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.ha.client
 import java.io.{File, IOException}
 import java.net.InetAddress
 import java.util
+import java.util.concurrent.atomic.AtomicBoolean
 import javax.security.auth.login.Configuration
 
 import scala.collection.JavaConverters._
@@ -32,7 +33,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.service.{NoopTBinaryFrontendServer, Serverable, 
ServiceState}
+import org.apache.kyuubi.service._
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
 class ServiceDiscoverySuite extends KerberizedTestHelper {
@@ -66,17 +67,21 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
       .set(HA_ZK_NAMESPACE, namespace)
       .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
-    val server: Serverable = new NoopTBinaryFrontendServer()
+    var serviceDiscovery: KyuubiServiceDiscovery = null
+    val server: Serverable = new NoopTBinaryFrontendServer() {
+      override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+        new NoopTBinaryFrontendService(this) {
+          override val discoveryService: Option[Service] = {
+            serviceDiscovery = new KyuubiServiceDiscovery(this)
+            Some(serviceDiscovery)
+          }
+        })
+    }
     server.initialize(conf)
     server.start()
-
     val znodeRoot = s"/$namespace"
-    val serviceDiscovery = new 
KyuubiServiceDiscovery(server.frontendServices.head)
     withZkClient(conf) { framework =>
       try {
-        serviceDiscovery.initialize(conf)
-        serviceDiscovery.start()
-
         assert(framework.checkExists().forPath("/abc") === null)
         assert(framework.checkExists().forPath(znodeRoot) !== null)
         val children = framework.getChildren.forPath(znodeRoot).asScala
@@ -87,13 +92,12 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
         children.foreach { child =>
           framework.delete().forPath(s"""$znodeRoot/$child""")
         }
-        eventually(timeout(5.seconds), interval(1.second)) {
+        eventually(timeout(5.seconds), interval(100.millis)) {
           assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
           assert(server.getServiceState === ServiceState.STOPPED)
         }
       } finally {
         server.stop()
-        serviceDiscovery.stop()
       }
     }
   }
@@ -165,16 +169,22 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
         .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
         .set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
 
-      val server: Serverable = new NoopTBinaryFrontendServer()
+      var serviceDiscovery: KyuubiServiceDiscovery = null
+      val server: Serverable = new NoopTBinaryFrontendServer() {
+        override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+          new NoopTBinaryFrontendService(this) {
+            override val discoveryService: Option[Service] = {
+              serviceDiscovery = new KyuubiServiceDiscovery(this)
+              Some(serviceDiscovery)
+            }
+          })
+      }
       server.initialize(conf)
       server.start()
 
       val znodeRoot = s"/$namespace"
-      val serviceDiscovery = new 
EngineServiceDiscovery(server.frontendServices.head)
       withZkClient(conf) { framework =>
         try {
-          serviceDiscovery.initialize(conf)
-          serviceDiscovery.start()
 
           assert(framework.checkExists().forPath("/abc") === null)
           assert(framework.checkExists().forPath(znodeRoot) !== null)
@@ -186,7 +196,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
           children.foreach { child =>
             framework.delete().forPath(s"""$znodeRoot/$child""")
           }
-          eventually(timeout(5.seconds), interval(1.second)) {
+          eventually(timeout(5.seconds), interval(100.millis)) {
             assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
             assert(server.getServiceState === ServiceState.STOPPED)
             val msg = s"This Kyuubi instance 
${server.frontendServices.head.connectionUrl}" +
@@ -217,4 +227,45 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
     assert(host === host2)
     assert(port === port2)
   }
+
+  test("stop engine in time while zk ensemble terminates") {
+    val zkServer = new EmbeddedZookeeper()
+    val conf = KyuubiConf()
+      .set(ZookeeperConf.ZK_CLIENT_PORT, 0)
+    try {
+      zkServer.initialize(conf)
+      zkServer.start()
+      var serviceDiscovery: EngineServiceDiscovery = null
+      val server = new NoopTBinaryFrontendServer() {
+        override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+          new NoopTBinaryFrontendService(this) {
+            override val discoveryService: Option[Service] = {
+              serviceDiscovery = new EngineServiceDiscovery(this)
+              Some(serviceDiscovery)
+            }
+          })
+      }
+      conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+        .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
+        .set(HA_ZK_QUORUM, zkServer.getConnectString)
+        .set(HA_ZK_SESSION_TIMEOUT, 2000)
+        .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+      server.initialize(conf)
+      server.start()
+      assert(server.getServiceState === ServiceState.STARTED)
+
+      zkServer.stop()
+      val isServerLostM = 
serviceDiscovery.getClass.getSuperclass.getDeclaredField("isServerLost")
+      isServerLostM.setAccessible(true)
+      val isServerLost = isServerLostM.get(serviceDiscovery)
+
+      eventually(timeout(10.seconds), interval(100.millis)) {
+        assert(isServerLost.asInstanceOf[AtomicBoolean].get())
+        assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
+        assert(server.getServiceState === ServiceState.STOPPED)
+      }
+    } finally {
+      zkServer.stop()
+    }
+  }
 }
diff --git 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
new file mode 100644
index 0000000..97cbdf8
--- /dev/null
+++ 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_CONN_BASE_RETRY_WAIT, 
HA_ZK_CONN_MAX_RETRIES, HA_ZK_CONN_MAX_RETRY_WAIT, HA_ZK_CONN_RETRY_POLICY}
+
+class ZooKeeperClientProviderSuite extends KyuubiFunSuite {
+
+  test("get graceful stop thread start delay") {
+    val conf = KyuubiConf()
+
+    val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
+    val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
+    val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
+    val delay1 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    assert(delay1 >= baseSleepTime * maxRetries)
+
+    conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+    val delay2 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    assert(delay2 === baseSleepTime)
+
+    conf.set(HA_ZK_CONN_RETRY_POLICY, "N_TIME")
+    val delay3 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    assert(delay3 === baseSleepTime * maxRetries)
+
+    conf.set(HA_ZK_CONN_RETRY_POLICY, "UNTIL_ELAPSED")
+    val delay4 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    assert(delay4 === maxSleepTime)
+
+    conf.set(HA_ZK_CONN_RETRY_POLICY, "BOUNDED_EXPONENTIAL_BACKOFF")
+    val delay5 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+    assert(delay5 >= baseSleepTime * maxRetries)
+  }
+}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
index 0d12d45..38171ce 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
@@ -52,8 +52,8 @@ trait WithKyuubiServer extends KyuubiFunSuite {
 
     conf.set("spark.ui.enabled", "false")
     conf.setIfMissing("spark.sql.catalogImplementation", "in-memory")
-    conf.setIfMissing(ENGINE_CHECK_INTERVAL, 3000L)
-    conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 10000L)
+    conf.setIfMissing(ENGINE_CHECK_INTERVAL, 1000L)
+    conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 3000L)
     // TODO KYUUBI #745
     conf.setIfMissing(ENGINE_INIT_TIMEOUT, 300000L)
     server = KyuubiServer.startServer(conf)

Reply via email to