This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 4d0ba2d71 [KYUUBI #3543] Handle zookeeper watch events
4d0ba2d71 is described below

commit 4d0ba2d712e43464e7f2cb878ddd70b5abbbaeab
Author: sychen <[email protected]>
AuthorDate: Fri Oct 14 12:36:51 2022 +0800

    [KYUUBI #3543] Handle zookeeper watch events
    
    In #1785, the `stop()` call of 
`org.apache.kyuubi.ha.client.ServiceDiscovery#stopGracefully` was removed, 
causing the znode to be recreated after deletion.
    
    Currently in `DeRegisterWatcher`, the `NodeDeleted` event is received, and 
the znode is not immediately closed, which causes 
`org.apache.curator.framework.recipes.nodes.PersistentNode#watcher` to receive 
the `NodeDeleted` event and asynchronously creates the same znode, which makes 
the service node unable to gracefully shutdown.
    
    In addition, because the watcher is one-time, after receiving the 
`NodeDataChanged` event, we should re-register the watcher to avoid missing the 
next `NodeDeleted` event.
    
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3543 from cxzl25/ha_zk_watcher.
    
    Closes #3543
    
    b3a109e4 [sychen] volatile
    eb16380f [sychen] Merge branch 'master' into ha_zk_watcher
    8621aee0 [sychen] add test
    c4ad5309 [sychen] handle zk watch events
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 72c11b2b2a9cf3e23b66a4ed335dd51533400aa0)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../zookeeper/ZookeeperDiscoveryClient.scala       | 32 ++++++----
 .../zookeeper/ZookeeperDiscoveryClientSuite.scala  | 70 ++++++++++++++++++++--
 2 files changed, 87 insertions(+), 15 deletions(-)

diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index 5b6beaa5d..965bac417 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -61,7 +61,8 @@ import org.apache.kyuubi.util.ThreadUtils
 class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
 
   private val zkClient: CuratorFramework = buildZookeeperClient(conf)
-  private var serviceNode: PersistentNode = _
+  @volatile private var serviceNode: PersistentNode = _
+  private var watcher: DeRegisterWatcher = _
 
   def createClient(): Unit = {
     zkClient.start()
@@ -230,15 +231,10 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
       version: Option[String] = None,
       external: Boolean = false): Unit = {
     val instance = serviceDiscovery.fe.connectionUrl
-    val watcher = new DeRegisterWatcher(instance, serviceDiscovery)
+    watcher = new DeRegisterWatcher(instance, serviceDiscovery)
     serviceNode = createPersistentNode(conf, namespace, instance, version, 
external)
     // Set a watch on the serviceNode
-    if (zkClient.checkExists
-        
.usingWatcher(watcher.asInstanceOf[Watcher]).forPath(serviceNode.getActualPath) 
== null) {
-      // No node exists, throw exception
-      throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
-        s"instance[${instance}] on ZooKeeper.")
-    }
+    watchNode()
   }
 
   def deregisterService(): Unit = {
@@ -385,12 +381,26 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
     localServiceNode
   }
 
-  class DeRegisterWatcher(instance: String, serviceDiscovery: 
ServiceDiscovery) extends Watcher {
+  private def watchNode(): Unit = {
+    if (zkClient.checkExists
+        
.usingWatcher(watcher.asInstanceOf[Watcher]).forPath(serviceNode.getActualPath) 
== null) {
+      // No node exists, throw exception
+      throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
+        s"instance[${watcher.instance}] on ZooKeeper.")
+    }
+  }
+
+  class DeRegisterWatcher(val instance: String, serviceDiscovery: 
ServiceDiscovery)
+    extends Watcher {
     override def process(event: WatchedEvent): Unit = {
       if (event.getType == Watcher.Event.EventType.NodeDeleted) {
-        warn(s"This Kyuubi instance ${instance} is now de-registered from" +
-          s" ZooKeeper. The server will be shut down after the last client 
session completes.")
+        warn(s"This Kyuubi instance $instance is now de-registered from" +
+          " ZooKeeper. The server will be shut down after the last client 
session completes.")
+        ZookeeperDiscoveryClient.this.deregisterService()
         serviceDiscovery.stopGracefully()
+      } else if (event.getType == Watcher.Event.EventType.NodeDataChanged) {
+        warn(s"This Kyuubi instance $instance now receives the NodeDataChanged 
event")
+        ZookeeperDiscoveryClient.this.watchNode()
       }
     }
   }
diff --git 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index dd6394b5f..8056713a4 100644
--- 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++ 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -25,17 +25,18 @@ import javax.security.auth.login.Configuration
 
 import scala.collection.JavaConverters._
 
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.hadoop.util.StringUtils
 import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.data.ACL
 import org.scalatest.time.SpanSugar._
 
-import org.apache.kyuubi.KerberizedTestHelper
+import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.AuthTypes
-import org.apache.kyuubi.ha.client.DiscoveryClientTests
-import org.apache.kyuubi.ha.client.EngineServiceDiscovery
+import org.apache.kyuubi.ha.client._
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
 import org.apache.kyuubi.service._
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
@@ -155,4 +156,65 @@ class ZookeeperDiscoveryClientSuite extends 
DiscoveryClientTests with Kerberized
       zkServer.stop()
     }
   }
+
+  test("watcher for zookeeper") {
+    val namespace = "kyuubiwatcher"
+    var discovery: ServiceDiscovery = null
+    val service = new NoopTBinaryFrontendServer() {
+      override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
+        new NoopTBinaryFrontendService(this) {
+          override val discoveryService: Option[Service] = {
+            discovery = new EngineServiceDiscovery(this)
+            Some(discovery)
+          }
+        })
+    }
+
+    conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
+      .set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
+      .set(HA_ZK_SESSION_TIMEOUT, 2000)
+      .set(HA_ADDRESSES, getConnectString)
+      .set(HA_NAMESPACE, namespace)
+      .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+    service.initialize(conf)
+    service.start()
+    assert(service.getServiceState === ServiceState.STARTED)
+
+    val basePath = s"/$namespace"
+    try {
+      withDiscoveryClient(conf) { discoveryClient =>
+        assert(discoveryClient.pathExists(basePath))
+        val children = discoveryClient.getChildren(basePath)
+        assert(children.head ===
+          s"serviceUri=${service.frontendServices.head.connectionUrl};" +
+          s"version=$KYUUBI_VERSION;sequence=0000000000")
+
+        children.foreach { child =>
+          val childPath = s"""$basePath/$child"""
+          val nodeData = discoveryClient.getData(childPath)
+
+          val zkClient = CuratorFrameworkFactory.builder()
+            .connectString(getConnectString)
+            .sessionTimeoutMs(5000)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+            .build
+          zkClient.start()
+
+          // Trigger the NodeDataChanged event
+          zkClient.setData().forPath(childPath, nodeData)
+          Thread.sleep(3000)
+          // Trigger the NodeDeleted event
+          zkClient.delete().forPath(childPath)
+          zkClient.close()
+        }
+        eventually(timeout(10.seconds), interval(100.millis)) {
+          assert(discovery.getServiceState === ServiceState.STOPPED)
+          assert(service.getServiceState === ServiceState.STOPPED)
+        }
+      }
+    } finally {
+      service.stop()
+      discovery.stop()
+    }
+  }
 }

Reply via email to