This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0441c  [KYUUBI #1446] Decouple zookeeper from abstract 
ServiceDiscovery
ff0441c is described below

commit ff0441c1bedb8df949008ca1510d3189d3c8f4d3
Author: hongdongdong <[email protected]>
AuthorDate: Tue Nov 30 17:16:28 2021 +0800

    [KYUUBI #1446] Decouple zookeeper from abstract ServiceDiscovery
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Decouple zookeeper from abstract ServiceDiscovery
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [X] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1448 from hddong/decouple-zeekeeper.
    
    Closes #1446
    
    0294c3c3 [hongdongdong] remove
    5da2f944 [hongdongdong] fix
    be066f64 [hongdongdong] fix
    03038558 [hongdongdong] [KYUUBI #1446] Decouple zookeeper from abstract 
ServiceDiscovery
    
    Authored-by: hongdongdong <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../kyuubi/ha/client/EngineServiceDiscovery.scala  |   9 +-
 .../apache/kyuubi/ha/client/ServiceDiscovery.scala | 195 ++-----------------
 .../ServiceDiscoveryClient.scala}                  | 212 ++++++---------------
 3 files changed, 78 insertions(+), 338 deletions(-)

diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
index 96c7e97..b9d5eba 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -31,13 +31,14 @@ class EngineServiceDiscovery(
     fe: FrontendService) extends ServiceDiscovery("EngineServiceDiscovery", 
fe) {
 
   override def stop(): Unit = synchronized {
-    closeServiceNode()
+    discoveryClient.deregisterService()
     conf.get(ENGINE_SHARE_LEVEL) match {
       // For connection level, we should clean up the namespace in zk in case 
the disk stress.
-      case "CONNECTION" if namespace != null =>
+      case "CONNECTION" =>
         try {
-          zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
-          info("Clean up discovery service due to this is connection share 
level.")
+          if (discoveryClient.postDeregisterService) {
+            info("Clean up discovery service due to this is connection share 
level.")
+          }
         } catch {
           case NonFatal(e) =>
             warn("Failed to clean up Spark engine before stop.", e)
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 7246bdf..13b0dd1 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -17,28 +17,20 @@
 
 package org.apache.kyuubi.ha.client
 
-import java.io.IOException
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.curator.framework.state.{ConnectionState, 
ConnectionStateListener}
-import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, 
RECONNECTED}
 import org.apache.curator.utils.ZKPaths
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, 
Watcher}
-import org.apache.zookeeper.CreateMode.PERSISTENT
-import org.apache.zookeeper.KeeperException.NodeExistsException
 
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
+import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient
+import 
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
 import org.apache.kyuubi.service.{AbstractService, FrontendService}
-import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
 
 /**
  * A abstract service for service discovery
@@ -48,92 +40,31 @@ import org.apache.kyuubi.util.{KyuubiHadoopUtils, 
ThreadUtils}
  */
 abstract class ServiceDiscovery(
     name: String,
-    fe: FrontendService) extends AbstractService(name) {
+    val fe: FrontendService) extends AbstractService(name) {
 
-  import ServiceDiscovery._
-  import ZooKeeperClientProvider._
+  private var _discoveryClient: ServiceDiscoveryClient = _
 
-  private var _zkClient: CuratorFramework = _
-  private var _serviceNode: PersistentNode = _
-
-  /**
-   * a pre-defined namespace used to publish the instance of the associate 
service
-   */
-  private var _namespace: String = _
-
-  def zkClient: CuratorFramework = _zkClient
-
-  def serviceNode: PersistentNode = _serviceNode
-
-  def namespace: String = _namespace
+  def discoveryClient: ServiceDiscoveryClient = _discoveryClient
 
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
-    _namespace = conf.get(HA_ZK_NAMESPACE)
-    val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
-    val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
-    _zkClient = buildZookeeperClient(conf)
-    zkClient.getConnectionStateListenable.addListener(new 
ConnectionStateListener {
-      private val isConnected = new AtomicBoolean(false)
 
-      override def stateChanged(client: CuratorFramework, newState: 
ConnectionState): Unit = {
-        info(s"Zookeeper client connection state changed to: $newState")
-        newState match {
-          case CONNECTED | RECONNECTED => isConnected.set(true)
-          case LOST =>
-            isConnected.set(false)
-            val delay = maxRetries.toLong * maxSleepTime
-            connectionChecker.schedule(
-              new Runnable {
-                override def run(): Unit = if (!isConnected.get()) {
-                  error(s"Zookeeper client connection state changed to: 
$newState, but failed to" +
-                    s" reconnect in ${delay / 1000} seconds. Give up retry. ")
-                  stopGracefully()
-                }
-              },
-              delay,
-              TimeUnit.MILLISECONDS)
-          case _ =>
-        }
-      }
-    })
-    zkClient.start()
+    _discoveryClient = new ServiceDiscoveryClient(this)
+    discoveryClient.createClient(conf)
+
     super.initialize(conf)
   }
 
   override def start(): Unit = {
-    val instance = fe.connectionUrl
-    _serviceNode = createServiceNode(conf, zkClient, namespace, instance)
-    // Set a watch on the serviceNode
-    val watcher = new DeRegisterWatcher
-    if 
(zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) 
== null) {
-      // No node exists, throw exception
-      throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
-        s"instance[${fe.connectionUrl}] on ZooKeeper.")
-    }
+    discoveryClient.registerService(conf)
     super.start()
   }
 
   override def stop(): Unit = {
-    closeServiceNode()
-    if (zkClient != null) zkClient.close()
+    discoveryClient.closeClient()
     super.stop()
   }
 
-  // close the EPHEMERAL_SEQUENTIAL node in zk
-  protected def closeServiceNode(): Unit = {
-    if (_serviceNode != null) {
-      try {
-        _serviceNode.close()
-      } catch {
-        case e: IOException =>
-          error("Failed to close the persistent ephemeral znode" + 
serviceNode.getActualPath, e)
-      } finally {
-        _serviceNode = null
-      }
-    }
-  }
-
   // stop the server genteelly
   def stopGracefully(): Unit = {
     stop()
@@ -143,23 +74,10 @@ abstract class ServiceDiscovery(
     fe.serverable.stop()
   }
 
-  class DeRegisterWatcher extends Watcher {
-    override def process(event: WatchedEvent): Unit = {
-      if (event.getType == Watcher.Event.EventType.NodeDeleted) {
-        warn(s"This Kyuubi instance ${fe.connectionUrl} is now de-registered 
from" +
-          s" ZooKeeper. The server will be shut down after the last client 
session completes.")
-        stopGracefully()
-      }
-    }
-  }
-
 }
 
 object ServiceDiscovery extends Logging {
 
-  final private lazy val connectionChecker =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
-
   def supportServiceDiscovery(conf: KyuubiConf): Boolean = {
     val zkEnsemble = conf.get(HA_ZK_QUORUM)
     zkEnsemble != null && zkEnsemble.nonEmpty
@@ -234,97 +152,6 @@ object ServiceDiscovery extends Logging {
       external: Boolean = false): String = {
     createServiceNode(conf, zkClient, namespace, instance, version, 
external).getActualPath
   }
-
-  private def createServiceNode(
-      conf: KyuubiConf,
-      zkClient: CuratorFramework,
-      namespace: String,
-      instance: String,
-      version: Option[String] = None,
-      external: Boolean = false): PersistentNode = {
-    val ns = ZKPaths.makePath(null, namespace)
-    try {
-      zkClient
-        .create()
-        .creatingParentsIfNeeded()
-        .withMode(PERSISTENT)
-        .forPath(ns)
-    } catch {
-      case _: NodeExistsException => // do nothing
-      case e: KeeperException =>
-        throw new KyuubiException(s"Failed to create namespace '$ns'", e)
-    }
-
-    val session = conf.get(HA_ZK_ENGINE_REF_ID)
-      .map(refId => s"refId=$refId;").getOrElse("")
-    val pathPrefix = ZKPaths.makePath(
-      namespace,
-      
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
-    var serviceNode: PersistentNode = null
-    val createMode =
-      if (external) CreateMode.PERSISTENT_SEQUENTIAL
-      else CreateMode.EPHEMERAL_SEQUENTIAL
-    val znodeData =
-      if (conf.get(HA_ZK_PUBLIST_CONFIGS) && session.isEmpty) {
-        addConfsToPublish(conf, instance)
-      } else {
-        instance
-      }
-    try {
-      serviceNode = new PersistentNode(
-        zkClient,
-        createMode,
-        false,
-        pathPrefix,
-        znodeData.getBytes(StandardCharsets.UTF_8))
-      serviceNode.start()
-      val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
-      if (!serviceNode.waitForInitialCreate(znodeTimeout, 
TimeUnit.MILLISECONDS)) {
-        throw new KyuubiException(s"Max znode creation wait time $znodeTimeout 
s exhausted")
-      }
-      info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for 
KyuubiServer uri: " + instance)
-    } catch {
-      case e: Exception =>
-        if (serviceNode != null) {
-          serviceNode.close()
-        }
-        throw new KyuubiException(
-          s"Unable to create a znode for this server instance: $instance",
-          e)
-    }
-    serviceNode
-  }
-
-  /**
-   * Refer to the implementation of HIVE-11581 to simplify user connection 
parameters.
-   * https://issues.apache.org/jira/browse/HIVE-11581
-   * HiveServer2 should store connection params in ZK
-   * when using dynamic service discovery for simpler client connection string.
-   */
-  private def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
-    if (!instance.contains(":")) {
-      return instance
-    }
-    val hostPort = instance.split(":", 2)
-    val confsToPublish = collection.mutable.Map[String, String]()
-
-    // Hostname
-    confsToPublish += ("hive.server2.thrift.bind.host" -> hostPort(0))
-    // Transport mode
-    confsToPublish += ("hive.server2.transport.mode" -> "binary")
-    // Transport specific confs
-    confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
-    confsToPublish += ("hive.server2.thrift.sasl.qop" -> 
conf.get(KyuubiConf.SASL_QOP))
-    // Auth specific confs
-    val authenticationMethod = 
conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
-    confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
-    if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
-      confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
-        
conf.get(KyuubiConf.SERVER_PRINCIPAL).map(KyuubiHadoopUtils.getServerPrincipal)
-          .getOrElse(""))
-    }
-    confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
-  }
 }
 
 case class ServiceNodeInfo(
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
similarity index 56%
copy from 
kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
copy to 
kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
index 7246bdf..74d6a59 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
@@ -15,64 +15,63 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
 
 import java.io.IOException
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.JavaConverters._
-
-import com.google.common.annotations.VisibleForTesting
 import org.apache.curator.framework.CuratorFramework
 import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.curator.framework.state.{ConnectionState, 
ConnectionStateListener}
-import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST, 
RECONNECTED}
+import org.apache.curator.framework.state.ConnectionState
+import org.apache.curator.framework.state.ConnectionState.CONNECTED
+import org.apache.curator.framework.state.ConnectionState.LOST
+import org.apache.curator.framework.state.ConnectionState.RECONNECTED
+import org.apache.curator.framework.state.ConnectionStateListener
 import org.apache.curator.utils.ZKPaths
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, 
Watcher}
+import org.apache.zookeeper.CreateMode
 import org.apache.zookeeper.CreateMode.PERSISTENT
+import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.WatchedEvent
+import org.apache.zookeeper.Watcher
 
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.service.{AbstractService, FrontendService}
-import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
-
-/**
- * A abstract service for service discovery
- *
- * @param name   the name of the service itself
- * @param fe the frontend service to publish for service discovery
- */
-abstract class ServiceDiscovery(
-    name: String,
-    fe: FrontendService) extends AbstractService(name) {
-
-  import ServiceDiscovery._
-  import ZooKeeperClientProvider._
-
-  private var _zkClient: CuratorFramework = _
-  private var _serviceNode: PersistentNode = _
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLIST_CONFIGS
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.buildZookeeperClient
+import 
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
+import 
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.ThreadUtils
+
+class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends 
Logging {
 
   /**
    * a pre-defined namespace used to publish the instance of the associate 
service
    */
-  private var _namespace: String = _
-
-  def zkClient: CuratorFramework = _zkClient
+  protected var _namespace: String = _
 
-  def serviceNode: PersistentNode = _serviceNode
+  private lazy val instance: String = serviceDiscovery.fe.connectionUrl
+  private var zkClient: CuratorFramework = _
+  private var serviceNode: PersistentNode = _
 
   def namespace: String = _namespace
 
-  override def initialize(conf: KyuubiConf): Unit = {
-    this.conf = conf
+  def createClient(conf: KyuubiConf): Unit = {
     _namespace = conf.get(HA_ZK_NAMESPACE)
     val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
     val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
-    _zkClient = buildZookeeperClient(conf)
+    zkClient = buildZookeeperClient(conf)
     zkClient.getConnectionStateListenable.addListener(new 
ConnectionStateListener {
       private val isConnected = new AtomicBoolean(false)
 
@@ -88,7 +87,7 @@ abstract class ServiceDiscovery(
                 override def run(): Unit = if (!isConnected.get()) {
                   error(s"Zookeeper client connection state changed to: 
$newState, but failed to" +
                     s" reconnect in ${delay / 1000} seconds. Give up retry. ")
-                  stopGracefully()
+                  serviceDiscovery.stopGracefully()
                 }
               },
               delay,
@@ -98,144 +97,67 @@ abstract class ServiceDiscovery(
       }
     })
     zkClient.start()
-    super.initialize(conf)
   }
 
-  override def start(): Unit = {
-    val instance = fe.connectionUrl
-    _serviceNode = createServiceNode(conf, zkClient, namespace, instance)
+  def registerService(conf: KyuubiConf): Unit = {
+    serviceNode = createServiceNode(conf, zkClient, namespace, instance)
     // Set a watch on the serviceNode
     val watcher = new DeRegisterWatcher
     if 
(zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) 
== null) {
       // No node exists, throw exception
       throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
-        s"instance[${fe.connectionUrl}] on ZooKeeper.")
+        s"instance[${instance}] on ZooKeeper.")
     }
-    super.start()
   }
 
-  override def stop(): Unit = {
-    closeServiceNode()
-    if (zkClient != null) zkClient.close()
-    super.stop()
-  }
-
-  // close the EPHEMERAL_SEQUENTIAL node in zk
-  protected def closeServiceNode(): Unit = {
-    if (_serviceNode != null) {
+  /**
+   * Close the serviceNode if not closed yet
+   * and the znode will be deleted upon the serviceNode closed.
+   */
+  def deregisterService(): Unit = {
+    // close the EPHEMERAL_SEQUENTIAL node in zk
+    if (serviceNode != null) {
       try {
-        _serviceNode.close()
+        serviceNode.close()
       } catch {
         case e: IOException =>
           error("Failed to close the persistent ephemeral znode" + 
serviceNode.getActualPath, e)
       } finally {
-        _serviceNode = null
+        serviceNode = null
       }
     }
   }
 
-  // stop the server genteelly
-  def stopGracefully(): Unit = {
-    stop()
-    while (fe.be != null && fe.be.sessionManager.getOpenSessionCount > 0) {
-      Thread.sleep(1000 * 60)
+  def postDeregisterService(): Boolean = {
+    if (namespace != null) {
+      zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
+      true
+    } else {
+      false
     }
-    fe.serverable.stop()
+  }
+
+  def closeClient(): Unit = {
+    deregisterService()
+    if (zkClient != null) zkClient.close()
   }
 
   class DeRegisterWatcher extends Watcher {
     override def process(event: WatchedEvent): Unit = {
       if (event.getType == Watcher.Event.EventType.NodeDeleted) {
-        warn(s"This Kyuubi instance ${fe.connectionUrl} is now de-registered 
from" +
+        warn(s"This Kyuubi instance ${instance} is now de-registered from" +
           s" ZooKeeper. The server will be shut down after the last client 
session completes.")
-        stopGracefully()
+        serviceDiscovery.stopGracefully()
       }
     }
   }
-
 }
 
-object ServiceDiscovery extends Logging {
-
+object ServiceDiscoveryClient extends Logging {
   final private lazy val connectionChecker =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
 
-  def supportServiceDiscovery(conf: KyuubiConf): Boolean = {
-    val zkEnsemble = conf.get(HA_ZK_QUORUM)
-    zkEnsemble != null && zkEnsemble.nonEmpty
-  }
-
-  def getServerHost(zkClient: CuratorFramework, namespace: String): 
Option[(String, Int)] = {
-    // TODO: use last one because to avoid touching some maybe-crashed engines
-    // We need a big improvement here.
-    getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match {
-      case Seq(sn) => Some((sn.host, sn.port))
-      case _ => None
-    }
-  }
-
-  def getEngineByRefId(
-      zkClient: CuratorFramework,
-      namespace: String,
-      engineRefId: String): Option[(String, Int)] = {
-    getServiceNodesInfo(zkClient, namespace, silent = true)
-      .find(_.engineRefId.exists(_.equals(engineRefId)))
-      .map(data => (data.host, data.port))
-  }
-
-  def getServiceNodesInfo(
-      zkClient: CuratorFramework,
-      namespace: String,
-      sizeOpt: Option[Int] = None,
-      silent: Boolean = false): Seq[ServiceNodeInfo] = {
-    try {
-      val hosts = zkClient.getChildren.forPath(namespace)
-      val size = sizeOpt.getOrElse(hosts.size())
-      hosts.asScala.takeRight(size).map { p =>
-        val path = ZKPaths.makePath(namespace, p)
-        val instance = new String(zkClient.getData.forPath(path), 
StandardCharsets.UTF_8)
-        val (host, port) = parseInstanceHostPort(instance)
-        val version = 
p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
-        val engineRefId = 
p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
-        info(s"Get service instance:$instance and version:$version under 
$namespace")
-        ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
-      }
-    } catch {
-      case _: Exception if silent => Nil
-      case e: Exception =>
-        error(s"Failed to get service node info", e)
-        Nil
-    }
-  }
-
-  @VisibleForTesting
-  private[client] def parseInstanceHostPort(instance: String): (String, Int) = 
{
-    val maybeInfos = instance.split(";")
-      .map(_.split("=", 2))
-      .filter(_.size == 2)
-      .map(i => (i(0), i(1)))
-      .toMap
-    if (maybeInfos.size > 0) {
-      (
-        maybeInfos.get("hive.server2.thrift.bind.host").get,
-        maybeInfos.get("hive.server2.thrift.port").get.toInt)
-    } else {
-      val strings = instance.split(":")
-      (strings(0), strings(1).toInt)
-    }
-  }
-
-  def createAndGetServiceNode(
-      conf: KyuubiConf,
-      zkClient: CuratorFramework,
-      namespace: String,
-      instance: String,
-      version: Option[String] = None,
-      external: Boolean = false): String = {
-    createServiceNode(conf, zkClient, namespace, instance, version, 
external).getActualPath
-  }
-
-  private def createServiceNode(
+  private[client] def createServiceNode(
       conf: KyuubiConf,
       zkClient: CuratorFramework,
       namespace: String,
@@ -301,7 +223,7 @@ object ServiceDiscovery extends Logging {
    * HiveServer2 should store connection params in ZK
    * when using dynamic service discovery for simpler client connection string.
    */
-  private def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
+  private[client] def addConfsToPublish(conf: KyuubiConf, instance: String): 
String = {
     if (!instance.contains(":")) {
       return instance
     }
@@ -326,13 +248,3 @@ object ServiceDiscovery extends Logging {
     confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
   }
 }
-
-case class ServiceNodeInfo(
-    namespace: String,
-    nodeName: String,
-    host: String,
-    port: Int,
-    version: Option[String],
-    engineRefId: Option[String]) {
-  def instance: String = s"$host:$port"
-}

Reply via email to