This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ff0441c [KYUUBI #1446] Decouple zookeeper from abstract
ServiceDiscovery
ff0441c is described below
commit ff0441c1bedb8df949008ca1510d3189d3c8f4d3
Author: hongdongdong <[email protected]>
AuthorDate: Tue Nov 30 17:16:28 2021 +0800
[KYUUBI #1446] Decouple zookeeper from abstract ServiceDiscovery
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Decouple zookeeper from abstract ServiceDiscovery
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [X] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1448 from hddong/decouple-zeekeeper.
Closes #1446
0294c3c3 [hongdongdong] remove
5da2f944 [hongdongdong] fix
be066f64 [hongdongdong] fix
03038558 [hongdongdong] [KYUUBI #1446] Decouple zookeeper from abstract
ServiceDiscovery
Authored-by: hongdongdong <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../kyuubi/ha/client/EngineServiceDiscovery.scala | 9 +-
.../apache/kyuubi/ha/client/ServiceDiscovery.scala | 195 ++-----------------
.../ServiceDiscoveryClient.scala} | 212 ++++++---------------
3 files changed, 78 insertions(+), 338 deletions(-)
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
index 96c7e97..b9d5eba 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -31,13 +31,14 @@ class EngineServiceDiscovery(
fe: FrontendService) extends ServiceDiscovery("EngineServiceDiscovery",
fe) {
override def stop(): Unit = synchronized {
- closeServiceNode()
+ discoveryClient.deregisterService()
conf.get(ENGINE_SHARE_LEVEL) match {
// For connection level, we should clean up the namespace in zk in case
the disk stress.
- case "CONNECTION" if namespace != null =>
+ case "CONNECTION" =>
try {
- zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
- info("Clean up discovery service due to this is connection share
level.")
+ if (discoveryClient.postDeregisterService) {
+ info("Clean up discovery service due to this is connection share
level.")
+ }
} catch {
case NonFatal(e) =>
warn("Failed to clean up Spark engine before stop.", e)
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 7246bdf..13b0dd1 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -17,28 +17,20 @@
package org.apache.kyuubi.ha.client
-import java.io.IOException
import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import com.google.common.annotations.VisibleForTesting
import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.curator.framework.state.{ConnectionState,
ConnectionStateListener}
-import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST,
RECONNECTED}
import org.apache.curator.utils.ZKPaths
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent,
Watcher}
-import org.apache.zookeeper.CreateMode.PERSISTENT
-import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
+import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient
+import
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
import org.apache.kyuubi.service.{AbstractService, FrontendService}
-import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
/**
* A abstract service for service discovery
@@ -48,92 +40,31 @@ import org.apache.kyuubi.util.{KyuubiHadoopUtils,
ThreadUtils}
*/
abstract class ServiceDiscovery(
name: String,
- fe: FrontendService) extends AbstractService(name) {
+ val fe: FrontendService) extends AbstractService(name) {
- import ServiceDiscovery._
- import ZooKeeperClientProvider._
+ private var _discoveryClient: ServiceDiscoveryClient = _
- private var _zkClient: CuratorFramework = _
- private var _serviceNode: PersistentNode = _
-
- /**
- * a pre-defined namespace used to publish the instance of the associate
service
- */
- private var _namespace: String = _
-
- def zkClient: CuratorFramework = _zkClient
-
- def serviceNode: PersistentNode = _serviceNode
-
- def namespace: String = _namespace
+ def discoveryClient: ServiceDiscoveryClient = _discoveryClient
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
- _namespace = conf.get(HA_ZK_NAMESPACE)
- val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
- val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
- _zkClient = buildZookeeperClient(conf)
- zkClient.getConnectionStateListenable.addListener(new
ConnectionStateListener {
- private val isConnected = new AtomicBoolean(false)
- override def stateChanged(client: CuratorFramework, newState:
ConnectionState): Unit = {
- info(s"Zookeeper client connection state changed to: $newState")
- newState match {
- case CONNECTED | RECONNECTED => isConnected.set(true)
- case LOST =>
- isConnected.set(false)
- val delay = maxRetries.toLong * maxSleepTime
- connectionChecker.schedule(
- new Runnable {
- override def run(): Unit = if (!isConnected.get()) {
- error(s"Zookeeper client connection state changed to:
$newState, but failed to" +
- s" reconnect in ${delay / 1000} seconds. Give up retry. ")
- stopGracefully()
- }
- },
- delay,
- TimeUnit.MILLISECONDS)
- case _ =>
- }
- }
- })
- zkClient.start()
+ _discoveryClient = new ServiceDiscoveryClient(this)
+ discoveryClient.createClient(conf)
+
super.initialize(conf)
}
override def start(): Unit = {
- val instance = fe.connectionUrl
- _serviceNode = createServiceNode(conf, zkClient, namespace, instance)
- // Set a watch on the serviceNode
- val watcher = new DeRegisterWatcher
- if
(zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath)
== null) {
- // No node exists, throw exception
- throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
- s"instance[${fe.connectionUrl}] on ZooKeeper.")
- }
+ discoveryClient.registerService(conf)
super.start()
}
override def stop(): Unit = {
- closeServiceNode()
- if (zkClient != null) zkClient.close()
+ discoveryClient.closeClient()
super.stop()
}
- // close the EPHEMERAL_SEQUENTIAL node in zk
- protected def closeServiceNode(): Unit = {
- if (_serviceNode != null) {
- try {
- _serviceNode.close()
- } catch {
- case e: IOException =>
- error("Failed to close the persistent ephemeral znode" +
serviceNode.getActualPath, e)
- } finally {
- _serviceNode = null
- }
- }
- }
-
// stop the server genteelly
def stopGracefully(): Unit = {
stop()
@@ -143,23 +74,10 @@ abstract class ServiceDiscovery(
fe.serverable.stop()
}
- class DeRegisterWatcher extends Watcher {
- override def process(event: WatchedEvent): Unit = {
- if (event.getType == Watcher.Event.EventType.NodeDeleted) {
- warn(s"This Kyuubi instance ${fe.connectionUrl} is now de-registered
from" +
- s" ZooKeeper. The server will be shut down after the last client
session completes.")
- stopGracefully()
- }
- }
- }
-
}
object ServiceDiscovery extends Logging {
- final private lazy val connectionChecker =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
-
def supportServiceDiscovery(conf: KyuubiConf): Boolean = {
val zkEnsemble = conf.get(HA_ZK_QUORUM)
zkEnsemble != null && zkEnsemble.nonEmpty
@@ -234,97 +152,6 @@ object ServiceDiscovery extends Logging {
external: Boolean = false): String = {
createServiceNode(conf, zkClient, namespace, instance, version,
external).getActualPath
}
-
- private def createServiceNode(
- conf: KyuubiConf,
- zkClient: CuratorFramework,
- namespace: String,
- instance: String,
- version: Option[String] = None,
- external: Boolean = false): PersistentNode = {
- val ns = ZKPaths.makePath(null, namespace)
- try {
- zkClient
- .create()
- .creatingParentsIfNeeded()
- .withMode(PERSISTENT)
- .forPath(ns)
- } catch {
- case _: NodeExistsException => // do nothing
- case e: KeeperException =>
- throw new KyuubiException(s"Failed to create namespace '$ns'", e)
- }
-
- val session = conf.get(HA_ZK_ENGINE_REF_ID)
- .map(refId => s"refId=$refId;").getOrElse("")
- val pathPrefix = ZKPaths.makePath(
- namespace,
-
s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
- var serviceNode: PersistentNode = null
- val createMode =
- if (external) CreateMode.PERSISTENT_SEQUENTIAL
- else CreateMode.EPHEMERAL_SEQUENTIAL
- val znodeData =
- if (conf.get(HA_ZK_PUBLIST_CONFIGS) && session.isEmpty) {
- addConfsToPublish(conf, instance)
- } else {
- instance
- }
- try {
- serviceNode = new PersistentNode(
- zkClient,
- createMode,
- false,
- pathPrefix,
- znodeData.getBytes(StandardCharsets.UTF_8))
- serviceNode.start()
- val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
- if (!serviceNode.waitForInitialCreate(znodeTimeout,
TimeUnit.MILLISECONDS)) {
- throw new KyuubiException(s"Max znode creation wait time $znodeTimeout
s exhausted")
- }
- info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for
KyuubiServer uri: " + instance)
- } catch {
- case e: Exception =>
- if (serviceNode != null) {
- serviceNode.close()
- }
- throw new KyuubiException(
- s"Unable to create a znode for this server instance: $instance",
- e)
- }
- serviceNode
- }
-
- /**
- * Refer to the implementation of HIVE-11581 to simplify user connection
parameters.
- * https://issues.apache.org/jira/browse/HIVE-11581
- * HiveServer2 should store connection params in ZK
- * when using dynamic service discovery for simpler client connection string.
- */
- private def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
- if (!instance.contains(":")) {
- return instance
- }
- val hostPort = instance.split(":", 2)
- val confsToPublish = collection.mutable.Map[String, String]()
-
- // Hostname
- confsToPublish += ("hive.server2.thrift.bind.host" -> hostPort(0))
- // Transport mode
- confsToPublish += ("hive.server2.transport.mode" -> "binary")
- // Transport specific confs
- confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
- confsToPublish += ("hive.server2.thrift.sasl.qop" ->
conf.get(KyuubiConf.SASL_QOP))
- // Auth specific confs
- val authenticationMethod =
conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
- confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
- if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
- confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
-
conf.get(KyuubiConf.SERVER_PRINCIPAL).map(KyuubiHadoopUtils.getServerPrincipal)
- .getOrElse(""))
- }
- confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
- }
}
case class ServiceNodeInfo(
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
similarity index 56%
copy from
kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
copy to
kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
index 7246bdf..74d6a59 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
@@ -15,64 +15,63 @@
* limitations under the License.
*/
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.JavaConverters._
-
-import com.google.common.annotations.VisibleForTesting
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.curator.framework.state.{ConnectionState,
ConnectionStateListener}
-import org.apache.curator.framework.state.ConnectionState.{CONNECTED, LOST,
RECONNECTED}
+import org.apache.curator.framework.state.ConnectionState
+import org.apache.curator.framework.state.ConnectionState.CONNECTED
+import org.apache.curator.framework.state.ConnectionState.LOST
+import org.apache.curator.framework.state.ConnectionState.RECONNECTED
+import org.apache.curator.framework.state.ConnectionStateListener
import org.apache.curator.utils.ZKPaths
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent,
Watcher}
+import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.CreateMode.PERSISTENT
+import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.WatchedEvent
+import org.apache.zookeeper.Watcher
-import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, Logging}
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.service.{AbstractService, FrontendService}
-import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
-
-/**
- * A abstract service for service discovery
- *
- * @param name the name of the service itself
- * @param fe the frontend service to publish for service discovery
- */
-abstract class ServiceDiscovery(
- name: String,
- fe: FrontendService) extends AbstractService(name) {
-
- import ServiceDiscovery._
- import ZooKeeperClientProvider._
-
- private var _zkClient: CuratorFramework = _
- private var _serviceNode: PersistentNode = _
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLIST_CONFIGS
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.buildZookeeperClient
+import
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
+import
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.ThreadUtils
+
+class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends
Logging {
/**
* a pre-defined namespace used to publish the instance of the associate
service
*/
- private var _namespace: String = _
-
- def zkClient: CuratorFramework = _zkClient
+ protected var _namespace: String = _
- def serviceNode: PersistentNode = _serviceNode
+ private lazy val instance: String = serviceDiscovery.fe.connectionUrl
+ private var zkClient: CuratorFramework = _
+ private var serviceNode: PersistentNode = _
def namespace: String = _namespace
- override def initialize(conf: KyuubiConf): Unit = {
- this.conf = conf
+ def createClient(conf: KyuubiConf): Unit = {
_namespace = conf.get(HA_ZK_NAMESPACE)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
- _zkClient = buildZookeeperClient(conf)
+ zkClient = buildZookeeperClient(conf)
zkClient.getConnectionStateListenable.addListener(new
ConnectionStateListener {
private val isConnected = new AtomicBoolean(false)
@@ -88,7 +87,7 @@ abstract class ServiceDiscovery(
override def run(): Unit = if (!isConnected.get()) {
error(s"Zookeeper client connection state changed to:
$newState, but failed to" +
s" reconnect in ${delay / 1000} seconds. Give up retry. ")
- stopGracefully()
+ serviceDiscovery.stopGracefully()
}
},
delay,
@@ -98,144 +97,67 @@ abstract class ServiceDiscovery(
}
})
zkClient.start()
- super.initialize(conf)
}
- override def start(): Unit = {
- val instance = fe.connectionUrl
- _serviceNode = createServiceNode(conf, zkClient, namespace, instance)
+ def registerService(conf: KyuubiConf): Unit = {
+ serviceNode = createServiceNode(conf, zkClient, namespace, instance)
// Set a watch on the serviceNode
val watcher = new DeRegisterWatcher
if
(zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath)
== null) {
// No node exists, throw exception
throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
- s"instance[${fe.connectionUrl}] on ZooKeeper.")
+ s"instance[${instance}] on ZooKeeper.")
}
- super.start()
}
- override def stop(): Unit = {
- closeServiceNode()
- if (zkClient != null) zkClient.close()
- super.stop()
- }
-
- // close the EPHEMERAL_SEQUENTIAL node in zk
- protected def closeServiceNode(): Unit = {
- if (_serviceNode != null) {
+ /**
+ * Close the serviceNode if not closed yet
+ * and the znode will be deleted upon the serviceNode closed.
+ */
+ def deregisterService(): Unit = {
+ // close the EPHEMERAL_SEQUENTIAL node in zk
+ if (serviceNode != null) {
try {
- _serviceNode.close()
+ serviceNode.close()
} catch {
case e: IOException =>
error("Failed to close the persistent ephemeral znode" +
serviceNode.getActualPath, e)
} finally {
- _serviceNode = null
+ serviceNode = null
}
}
}
- // stop the server genteelly
- def stopGracefully(): Unit = {
- stop()
- while (fe.be != null && fe.be.sessionManager.getOpenSessionCount > 0) {
- Thread.sleep(1000 * 60)
+ def postDeregisterService(): Boolean = {
+ if (namespace != null) {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
+ true
+ } else {
+ false
}
- fe.serverable.stop()
+ }
+
+ def closeClient(): Unit = {
+ deregisterService()
+ if (zkClient != null) zkClient.close()
}
class DeRegisterWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
if (event.getType == Watcher.Event.EventType.NodeDeleted) {
- warn(s"This Kyuubi instance ${fe.connectionUrl} is now de-registered
from" +
+ warn(s"This Kyuubi instance ${instance} is now de-registered from" +
s" ZooKeeper. The server will be shut down after the last client
session completes.")
- stopGracefully()
+ serviceDiscovery.stopGracefully()
}
}
}
-
}
-object ServiceDiscovery extends Logging {
-
+object ServiceDiscoveryClient extends Logging {
final private lazy val connectionChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
- def supportServiceDiscovery(conf: KyuubiConf): Boolean = {
- val zkEnsemble = conf.get(HA_ZK_QUORUM)
- zkEnsemble != null && zkEnsemble.nonEmpty
- }
-
- def getServerHost(zkClient: CuratorFramework, namespace: String):
Option[(String, Int)] = {
- // TODO: use last one because to avoid touching some maybe-crashed engines
- // We need a big improvement here.
- getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match {
- case Seq(sn) => Some((sn.host, sn.port))
- case _ => None
- }
- }
-
- def getEngineByRefId(
- zkClient: CuratorFramework,
- namespace: String,
- engineRefId: String): Option[(String, Int)] = {
- getServiceNodesInfo(zkClient, namespace, silent = true)
- .find(_.engineRefId.exists(_.equals(engineRefId)))
- .map(data => (data.host, data.port))
- }
-
- def getServiceNodesInfo(
- zkClient: CuratorFramework,
- namespace: String,
- sizeOpt: Option[Int] = None,
- silent: Boolean = false): Seq[ServiceNodeInfo] = {
- try {
- val hosts = zkClient.getChildren.forPath(namespace)
- val size = sizeOpt.getOrElse(hosts.size())
- hosts.asScala.takeRight(size).map { p =>
- val path = ZKPaths.makePath(namespace, p)
- val instance = new String(zkClient.getData.forPath(path),
StandardCharsets.UTF_8)
- val (host, port) = parseInstanceHostPort(instance)
- val version =
p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
- val engineRefId =
p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
- info(s"Get service instance:$instance and version:$version under
$namespace")
- ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
- }
- } catch {
- case _: Exception if silent => Nil
- case e: Exception =>
- error(s"Failed to get service node info", e)
- Nil
- }
- }
-
- @VisibleForTesting
- private[client] def parseInstanceHostPort(instance: String): (String, Int) =
{
- val maybeInfos = instance.split(";")
- .map(_.split("=", 2))
- .filter(_.size == 2)
- .map(i => (i(0), i(1)))
- .toMap
- if (maybeInfos.size > 0) {
- (
- maybeInfos.get("hive.server2.thrift.bind.host").get,
- maybeInfos.get("hive.server2.thrift.port").get.toInt)
- } else {
- val strings = instance.split(":")
- (strings(0), strings(1).toInt)
- }
- }
-
- def createAndGetServiceNode(
- conf: KyuubiConf,
- zkClient: CuratorFramework,
- namespace: String,
- instance: String,
- version: Option[String] = None,
- external: Boolean = false): String = {
- createServiceNode(conf, zkClient, namespace, instance, version,
external).getActualPath
- }
-
- private def createServiceNode(
+ private[client] def createServiceNode(
conf: KyuubiConf,
zkClient: CuratorFramework,
namespace: String,
@@ -301,7 +223,7 @@ object ServiceDiscovery extends Logging {
* HiveServer2 should store connection params in ZK
* when using dynamic service discovery for simpler client connection string.
*/
- private def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
+ private[client] def addConfsToPublish(conf: KyuubiConf, instance: String):
String = {
if (!instance.contains(":")) {
return instance
}
@@ -326,13 +248,3 @@ object ServiceDiscovery extends Logging {
confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
}
}
-
-case class ServiceNodeInfo(
- namespace: String,
- nodeName: String,
- host: String,
- port: Int,
- version: Option[String],
- engineRefId: Option[String]) {
- def instance: String = s"$host:$port"
-}