This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new 4d0ba2d71 [KYUUBI #3543] Handle zookeeper watch events
4d0ba2d71 is described below
commit 4d0ba2d712e43464e7f2cb878ddd70b5abbbaeab
Author: sychen <[email protected]>
AuthorDate: Fri Oct 14 12:36:51 2022 +0800
[KYUUBI #3543] Handle zookeeper watch events
In #1785, the `stop()` call of
`org.apache.kyuubi.ha.client.ServiceDiscovery#stopGracefully` was removed,
causing the znode to be recreated after deletion.
Currently in `DeRegisterWatcher`, the `NodeDeleted` event is received, and
the znode is not immediately closed, which causes
`org.apache.curator.framework.recipes.nodes.PersistentNode#watcher` to receive
the `NodeDeleted` event and asynchronously creates the same znode, which makes
the service node unable to gracefully shutdown.
In addition, because the watcher is one-time, after receiving the
`NodeDataChanged` event, we should re-register the watcher to avoid missing the
next `NodeDeleted` event.
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3543 from cxzl25/ha_zk_watcher.
Closes #3543
b3a109e4 [sychen] volatile
eb16380f [sychen] Merge branch 'master' into ha_zk_watcher
8621aee0 [sychen] add test
c4ad5309 [sychen] handle zk watch events
Authored-by: sychen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 72c11b2b2a9cf3e23b66a4ed335dd51533400aa0)
Signed-off-by: Cheng Pan <[email protected]>
---
.../zookeeper/ZookeeperDiscoveryClient.scala | 32 ++++++----
.../zookeeper/ZookeeperDiscoveryClientSuite.scala | 70 ++++++++++++++++++++--
2 files changed, 87 insertions(+), 15 deletions(-)
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index 5b6beaa5d..965bac417 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -61,7 +61,8 @@ import org.apache.kyuubi.util.ThreadUtils
class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
private val zkClient: CuratorFramework = buildZookeeperClient(conf)
- private var serviceNode: PersistentNode = _
+ @volatile private var serviceNode: PersistentNode = _
+ private var watcher: DeRegisterWatcher = _
def createClient(): Unit = {
zkClient.start()
@@ -230,15 +231,10 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
version: Option[String] = None,
external: Boolean = false): Unit = {
val instance = serviceDiscovery.fe.connectionUrl
- val watcher = new DeRegisterWatcher(instance, serviceDiscovery)
+ watcher = new DeRegisterWatcher(instance, serviceDiscovery)
serviceNode = createPersistentNode(conf, namespace, instance, version,
external)
// Set a watch on the serviceNode
- if (zkClient.checkExists
-
.usingWatcher(watcher.asInstanceOf[Watcher]).forPath(serviceNode.getActualPath)
== null) {
- // No node exists, throw exception
- throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
- s"instance[${instance}] on ZooKeeper.")
- }
+ watchNode()
}
def deregisterService(): Unit = {
@@ -385,12 +381,26 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
localServiceNode
}
- class DeRegisterWatcher(instance: String, serviceDiscovery:
ServiceDiscovery) extends Watcher {
+ private def watchNode(): Unit = {
+ if (zkClient.checkExists
+
.usingWatcher(watcher.asInstanceOf[Watcher]).forPath(serviceNode.getActualPath)
== null) {
+ // No node exists, throw exception
+ throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
+ s"instance[${watcher.instance}] on ZooKeeper.")
+ }
+ }
+
+ class DeRegisterWatcher(val instance: String, serviceDiscovery:
ServiceDiscovery)
+ extends Watcher {
override def process(event: WatchedEvent): Unit = {
if (event.getType == Watcher.Event.EventType.NodeDeleted) {
- warn(s"This Kyuubi instance ${instance} is now de-registered from" +
- s" ZooKeeper. The server will be shut down after the last client
session completes.")
+ warn(s"This Kyuubi instance $instance is now de-registered from" +
+ " ZooKeeper. The server will be shut down after the last client
session completes.")
+ ZookeeperDiscoveryClient.this.deregisterService()
serviceDiscovery.stopGracefully()
+ } else if (event.getType == Watcher.Event.EventType.NodeDataChanged) {
+ warn(s"This Kyuubi instance $instance now receives the NodeDataChanged
event")
+ ZookeeperDiscoveryClient.this.watchNode()
}
}
}
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index dd6394b5f..8056713a4 100644
---
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -25,17 +25,18 @@ import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.util.StringUtils
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.ACL
import org.scalatest.time.SpanSugar._
-import org.apache.kyuubi.KerberizedTestHelper
+import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClientTests
-import org.apache.kyuubi.ha.client.EngineServiceDiscovery
+import org.apache.kyuubi.ha.client._
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.service._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@@ -155,4 +156,65 @@ class ZookeeperDiscoveryClientSuite extends
DiscoveryClientTests with Kerberized
zkServer.stop()
}
}
+
+ test("watcher for zookeeper") {
+ val namespace = "kyuubiwatcher"
+ var discovery: ServiceDiscovery = null
+ val service = new NoopTBinaryFrontendServer() {
+ override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+ new NoopTBinaryFrontendService(this) {
+ override val discoveryService: Option[Service] = {
+ discovery = new EngineServiceDiscovery(this)
+ Some(discovery)
+ }
+ })
+ }
+
+ conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+ .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
+ .set(HA_ZK_SESSION_TIMEOUT, 2000)
+ .set(HA_ADDRESSES, getConnectString)
+ .set(HA_NAMESPACE, namespace)
+ .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ service.initialize(conf)
+ service.start()
+ assert(service.getServiceState === ServiceState.STARTED)
+
+ val basePath = s"/$namespace"
+ try {
+ withDiscoveryClient(conf) { discoveryClient =>
+ assert(discoveryClient.pathExists(basePath))
+ val children = discoveryClient.getChildren(basePath)
+ assert(children.head ===
+ s"serviceUri=${service.frontendServices.head.connectionUrl};" +
+ s"version=$KYUUBI_VERSION;sequence=0000000000")
+
+ children.foreach { child =>
+ val childPath = s"""$basePath/$child"""
+ val nodeData = discoveryClient.getData(childPath)
+
+ val zkClient = CuratorFrameworkFactory.builder()
+ .connectString(getConnectString)
+ .sessionTimeoutMs(5000)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build
+ zkClient.start()
+
+ // Trigger the NodeDataChanged event
+ zkClient.setData().forPath(childPath, nodeData)
+ Thread.sleep(3000)
+ // Trigger the NodeDeleted event
+ zkClient.delete().forPath(childPath)
+ zkClient.close()
+ }
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ assert(discovery.getServiceState === ServiceState.STOPPED)
+ assert(service.getServiceState === ServiceState.STOPPED)
+ }
+ }
+ } finally {
+ service.stop()
+ discovery.stop()
+ }
+ }
}