This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ed9bc9e0fcabfd1b1f764cdfef81cbc8b8e38820
Author: xyuanlu <[email protected]>
AuthorDate: Tue Apr 25 21:03:42 2023 -0700

    Add a Trie class to represent RecursivePersistWatcherListener in ZkClient 
(#2439)
    
    ZkPathRecursiveWatcherTrie will be used for a registry for ZK persist 
recursive watcher.
    When persist recursive watcher is registered on path /X, ZK will send out 
data change for any data/child changing under the tree structure of /X. The 
event only include the path of changed ZNode.
    
    In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need 
to track back the path and notify all registered recursive persist listener and 
notify them about the node change.
    ref: 
https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
---
 .../zkclient/RecursivePersistListener.java         |  41 +++++
 .../zkclient/util/ZkPathRecursiveWatcherTrie.java  | 169 +++++++++++++++++++++
 .../util/TestZkPathRecursiveWatcherTrie.java       | 104 +++++++++++++
 3 files changed, 314 insertions(+)

diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
new file mode 100644
index 000000000..96c09b340
--- /dev/null
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
@@ -0,0 +1,41 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * An {@link RecursivePersistListener} can be registered at a {@link ZkClient} 
for listening on all
+ * zk children changes in a tree structure for a given path.
+ *
+ * The listener is a persist listener. No need to resubscribe.
+ *
+ */
+public interface RecursivePersistListener {
+  /**
+   * invoked when there is a node added, removed or node data change in the 
tree structure of
+   * that RecursivePersistListener subscribed path
+   * @param dataPath The path of ZNode that change happened
+   * @param eventType Event type, including NodeCreated, NodeDataChanged and 
NodeDeleted
+   * @throws Exception
+   */
+  public void handleZNodeChange(String dataPath, Watcher.Event.EventType 
eventType)
+      throws Exception;
+}
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
new file mode 100644
index 000000000..6bdf0cceb
--- /dev/null
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -0,0 +1,169 @@
+package org.apache.helix.zookeeper.zkclient.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
+
+
+/**
+ * ZkPathRecursiveWatcherTrie will be used as a registry of persistent 
recursive watchers.
+ * When persist recursive watcher is registered on path /X, ZK will send out 
data change for any
+ * data/child changing under the tree structure of /X. The event only include 
the path of changed
+ * ZNode.
+ * In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need 
to track back the path
+ * and notify all registered recursive persist listener and notify them about 
the node change.
+ * ref: 
https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
+ */
+public class ZkPathRecursiveWatcherTrie {
+
+  /** Root node of PathTrie */
+  private final TrieNode _rootNode;
+
+  static class TrieNode {
+
+    final String _value;   // Segmented ZNode path at current level
+    // A map of segmented ZNode path of next level to TrieNode, We keep the 
same initial
+    // children size as Zk server
+    final Map<String, TrieNode> _children = new HashMap<>(4);
+    // A list of recursive persist watcher on the current path
+    Set<RecursivePersistListener> _recursiveListeners = new HashSet<>(4);
+
+    /**
+     * Create a trie node with parent as parameter.
+     *
+     * @param value the value stored in this node
+     */
+    private TrieNode(String value) {
+      _value = value;
+    }
+
+    /**
+     * The value stored in this node.
+     *
+     * @return the value stored in this node
+     */
+    public String getValue() {
+      return this._value;
+    }
+
+    /**
+     * Add a child to the existing node.
+     *
+     * @param childName the string name of the child
+     * @param node the node that is the child
+     */
+    void addChild(String childName, TrieNode node) {
+      this._children.putIfAbsent(childName, node);
+    }
+
+    /**
+     * Return the child of a node mapping to the input child name.
+     *
+     * @param childName the name of the child
+     * @return the child of a node
+     */
+    @VisibleForTesting
+    TrieNode getChild(String childName) {
+      return this._children.get(childName);
+    }
+
+    /**
+     * Get the list of children of this trienode.
+     *
+     * @return A collection containing the node's children
+     */
+    @VisibleForTesting
+    Map<String, TrieNode> getChildren() {
+      return _children;
+    }
+
+    /**
+     * Get the set of RecursivePersistWatcherListener
+     * Returns an empty set if no listener is registered on the path
+     * @return
+     */
+    @VisibleForTesting
+    Set<RecursivePersistListener> getRecursiveListeners() {
+      return _recursiveListeners;
+    }
+
+    @Override
+    public String toString() {
+      return "TrieNode [name=" + _value + ", children=" + _children.keySet() + 
"]";
+    }
+  }
+
+  /**
+   * Construct a new PathTrie with a root node.
+   */
+  public ZkPathRecursiveWatcherTrie() {
+    this._rootNode = new TrieNode( "/");
+  }
+
+  /**
+   * Add a path to the path trie. All paths are relative to the root node.
+   *
+   * @param path the path to add RecursivePersistListener
+   * @param listener the RecursivePersistListener to be added
+   */
+  public void addRecursiveListener(final String path, RecursivePersistListener 
listener) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    if (path.isEmpty()) {
+      throw new IllegalArgumentException("Empty path: " + path);
+    }
+    final List<String>  pathComponents = split(path);
+
+    synchronized(this) {
+      TrieNode parent = _rootNode;
+      for (final String part : pathComponents) {
+        parent = parent.getChildren().computeIfAbsent(part, (p)-> new 
TrieNode(part) );
+      }
+      parent._recursiveListeners.add(listener);
+    }
+  }
+
+  /**
+   * Clear all nodes in the trie.
+   */
+  public synchronized void clear() {
+    _rootNode.getChildren().clear();
+  }
+
+  private static List<String> split(final String path) {
+    return Stream.of(path.split("/")).filter(t -> 
!t.trim().isEmpty()).collect(Collectors.toList());
+  }
+
+  // only for test
+  @VisibleForTesting
+  TrieNode getRootNode() {
+    return _rootNode;
+  }
+}
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
new file mode 100644
index 000000000..83d215cf6
--- /dev/null
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
@@ -0,0 +1,104 @@
+package org.apache.helix.zookeeper.zkclient.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+
+
+public class TestZkPathRecursiveWatcherTrie {
+  ZkPathRecursiveWatcherTrie _recursiveWatcherTrie = new 
ZkPathRecursiveWatcherTrie();
+
+  /**
+   * test case create a tire tree of following structure. '*' means how many 
listener is added
+   * on that path.
+   * [x] mean the listener is removed as step 'x' indicated in the test 
comment.
+   *
+   *                         "/"
+   *                         a
+   *                    /   |        \
+   *                  b*    b2* [2]    b3
+   *                 /                 \
+   *                c                   c
+   *          / /   |  \  \              \
+   *        d* d1* d2* d3* d4*           d
+   *                                      \
+   *                                      e
+   *                                      \
+   *                                      f**   [1] [3]
+   *
+   */
+  @org.testng.annotations.Test
+  public void testAddRemoveWatcher() {
+    System.out.println("START testAddRemoveWatcher at " + new 
Date(System.currentTimeMillis()));
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d1", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d2", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d3", new Test());
+    _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d4", new Test());
+
+    Test listenerOnb = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b", listenerOnb);
+    Test listenerOnb2 = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b2", listenerOnb2);
+    Test listenerOnf_1 = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1);
+    Test listenerOnf_2 = new Test();
+    _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2);
+
+    // node f should have 2 listeners
+    Assert.assertEquals(
+        
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+            .getChild("e").getChild("f").getRecursiveListeners().size(), 2);
+
+   /* _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", 
listenerOnf_1); // step [1]
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b2", listenerOnb2);      
    //  step[2]
+    //b2 will be removed. node "a" should have 2 children, b and b3.
+    
Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(),
 2);
+    
Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b3"));
+    
Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b"));
+    // path "/a/b3/c/d/e/f still exists with end node "f" has one listener
+    Assert.assertEquals(
+        
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+            .getChild("e").getChildren().size(), 1);
+    Assert.assertEquals(
+        
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+            .getChild("e").getChild("f").getRecursiveListeners().size(), 1);
+
+    // removing all listeners of /a/b3/c/d/e/f.
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", 
listenerOnf_1); // test no op
+    _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", 
listenerOnf_2);
+    // b3 should be removed as well as all children nodes of b3
+    
Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(),
 1);
+ */
+  }
+
+  class Test implements RecursivePersistListener {
+
+    @Override
+    public void handleZNodeChange(String dataPath, Watcher.Event.EventType 
eventType)
+        throws Exception {
+
+    }
+  }
+}

Reply via email to