This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 6d42ee64e Add a Trie class to represent
RecursivePersistWatcherListener in ZkClient (#2439)
6d42ee64e is described below
commit 6d42ee64e422232dc93f96c31a349cb200ea7c62
Author: xyuanlu <[email protected]>
AuthorDate: Tue Apr 25 21:03:42 2023 -0700
Add a Trie class to represent RecursivePersistWatcherListener in ZkClient
(#2439)
ZkPathRecursiveWatcherTrie will be used for a registry for ZK persist
recursive watcher.
When persist recursive watcher is registered on path /X, ZK will send out
data change for any data/child changing under the tree structure of /X. The
event only include the path of changed ZNode.
In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need
to track back the path and notify all registered recursive persist listener and
notify them about the node change.
ref:
https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
---
.../zkclient/RecursivePersistListener.java | 41 +++++
.../zkclient/util/ZkPathRecursiveWatcherTrie.java | 169 +++++++++++++++++++++
.../util/TestZkPathRecursiveWatcherTrie.java | 104 +++++++++++++
3 files changed, 314 insertions(+)
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
new file mode 100644
index 000000000..96c09b340
--- /dev/null
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java
@@ -0,0 +1,41 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * An {@link RecursivePersistListener} can be registered at a {@link ZkClient}
for listening on all
+ * zk children changes in a tree structure for a given path.
+ *
+ * The listener is a persist listener. No need to resubscribe.
+ *
+ */
+public interface RecursivePersistListener {
+ /**
+ * invoked when there is a node added, removed or node data change in the
tree structure of
+ * that RecursivePersistListener subscribed path
+ * @param dataPath The path of ZNode that change happened
+ * @param eventType Event type, including NodeCreated, NodeDataChanged and
NodeDeleted
+ * @throws Exception
+ */
+ public void handleZNodeChange(String dataPath, Watcher.Event.EventType
eventType)
+ throws Exception;
+}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
new file mode 100644
index 000000000..6bdf0cceb
--- /dev/null
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -0,0 +1,169 @@
+package org.apache.helix.zookeeper.zkclient.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
+
+
+/**
+ * ZkPathRecursiveWatcherTrie will be used as a registry of persistent
recursive watchers.
+ * When persist recursive watcher is registered on path /X, ZK will send out
data change for any
+ * data/child changing under the tree structure of /X. The event only include
the path of changed
+ * ZNode.
+ * In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need
to track back the path
+ * and notify all registered recursive persist listener and notify them about
the node change.
+ * ref:
https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
+ */
+public class ZkPathRecursiveWatcherTrie {
+
+ /** Root node of PathTrie */
+ private final TrieNode _rootNode;
+
+ static class TrieNode {
+
+ final String _value; // Segmented ZNode path at current level
+ // A map of segmented ZNode path of next level to TrieNode, We keep the
same initial
+ // children size as Zk server
+ final Map<String, TrieNode> _children = new HashMap<>(4);
+ // A list of recursive persist watcher on the current path
+ Set<RecursivePersistListener> _recursiveListeners = new HashSet<>(4);
+
+ /**
+ * Create a trie node with parent as parameter.
+ *
+ * @param value the value stored in this node
+ */
+ private TrieNode(String value) {
+ _value = value;
+ }
+
+ /**
+ * The value stored in this node.
+ *
+ * @return the value stored in this node
+ */
+ public String getValue() {
+ return this._value;
+ }
+
+ /**
+ * Add a child to the existing node.
+ *
+ * @param childName the string name of the child
+ * @param node the node that is the child
+ */
+ void addChild(String childName, TrieNode node) {
+ this._children.putIfAbsent(childName, node);
+ }
+
+ /**
+ * Return the child of a node mapping to the input child name.
+ *
+ * @param childName the name of the child
+ * @return the child of a node
+ */
+ @VisibleForTesting
+ TrieNode getChild(String childName) {
+ return this._children.get(childName);
+ }
+
+ /**
+ * Get the list of children of this trienode.
+ *
+ * @return A collection containing the node's children
+ */
+ @VisibleForTesting
+ Map<String, TrieNode> getChildren() {
+ return _children;
+ }
+
+ /**
+ * Get the set of RecursivePersistWatcherListener
+ * Returns an empty set if no listener is registered on the path
+ * @return
+ */
+ @VisibleForTesting
+ Set<RecursivePersistListener> getRecursiveListeners() {
+ return _recursiveListeners;
+ }
+
+ @Override
+ public String toString() {
+ return "TrieNode [name=" + _value + ", children=" + _children.keySet() +
"]";
+ }
+ }
+
+ /**
+ * Construct a new PathTrie with a root node.
+ */
+ public ZkPathRecursiveWatcherTrie() {
+ this._rootNode = new TrieNode( "/");
+ }
+
+ /**
+ * Add a path to the path trie. All paths are relative to the root node.
+ *
+ * @param path the path to add RecursivePersistListener
+ * @param listener the RecursivePersistListener to be added
+ */
+ public void addRecursiveListener(final String path, RecursivePersistListener
listener) {
+ Objects.requireNonNull(path, "Path cannot be null");
+
+ if (path.isEmpty()) {
+ throw new IllegalArgumentException("Empty path: " + path);
+ }
+ final List<String> pathComponents = split(path);
+
+ synchronized(this) {
+ TrieNode parent = _rootNode;
+ for (final String part : pathComponents) {
+ parent = parent.getChildren().computeIfAbsent(part, (p)-> new
TrieNode(part) );
+ }
+ parent._recursiveListeners.add(listener);
+ }
+ }
+
+ /**
+ * Clear all nodes in the trie.
+ */
+ public synchronized void clear() {
+ _rootNode.getChildren().clear();
+ }
+
+ private static List<String> split(final String path) {
+ return Stream.of(path.split("/")).filter(t ->
!t.trim().isEmpty()).collect(Collectors.toList());
+ }
+
+ // only for test
+ @VisibleForTesting
+ TrieNode getRootNode() {
+ return _rootNode;
+ }
+}
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
new file mode 100644
index 000000000..83d215cf6
--- /dev/null
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java
@@ -0,0 +1,104 @@
+package org.apache.helix.zookeeper.zkclient.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+
+
+public class TestZkPathRecursiveWatcherTrie {
+ ZkPathRecursiveWatcherTrie _recursiveWatcherTrie = new
ZkPathRecursiveWatcherTrie();
+
+ /**
+ * test case create a tire tree of following structure. '*' means how many
listener is added
+ * on that path.
+ * [x] mean the listener is removed as step 'x' indicated in the test
comment.
+ *
+ * "/"
+ * a
+ * / | \
+ * b* b2* [2] b3
+ * / \
+ * c c
+ * / / | \ \ \
+ * d* d1* d2* d3* d4* d
+ * \
+ * e
+ * \
+ * f** [1] [3]
+ *
+ */
+ @org.testng.annotations.Test
+ public void testAddRemoveWatcher() {
+ System.out.println("START testAddRemoveWatcher at " + new
Date(System.currentTimeMillis()));
+ _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d", new Test());
+ _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d1", new Test());
+ _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d2", new Test());
+ _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d3", new Test());
+ _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d4", new Test());
+
+ Test listenerOnb = new Test();
+ _recursiveWatcherTrie.addRecursiveListener("/a/b", listenerOnb);
+ Test listenerOnb2 = new Test();
+ _recursiveWatcherTrie.addRecursiveListener("/a/b2", listenerOnb2);
+ Test listenerOnf_1 = new Test();
+ _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1);
+ Test listenerOnf_2 = new Test();
+ _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2);
+
+ // node f should have 2 listeners
+ Assert.assertEquals(
+
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+ .getChild("e").getChild("f").getRecursiveListeners().size(), 2);
+
+ /* _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f",
listenerOnf_1); // step [1]
+ _recursiveWatcherTrie.removeRecursiveListener("/a/b2", listenerOnb2);
// step[2]
+ //b2 will be removed. node "a" should have 2 children, b and b3.
+
Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(),
2);
+
Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b3"));
+
Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b"));
+ // path "/a/b3/c/d/e/f still exists with end node "f" has one listener
+ Assert.assertEquals(
+
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+ .getChild("e").getChildren().size(), 1);
+ Assert.assertEquals(
+
_recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d")
+ .getChild("e").getChild("f").getRecursiveListeners().size(), 1);
+
+ // removing all listeners of /a/b3/c/d/e/f.
+ _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f",
listenerOnf_1); // test no op
+ _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f",
listenerOnf_2);
+ // b3 should be removed as well as all children nodes of b3
+
Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(),
1);
+ */
+ }
+
+ class Test implements RecursivePersistListener {
+
+ @Override
+ public void handleZNodeChange(String dataPath, Watcher.Event.EventType
eventType)
+ throws Exception {
+
+ }
+ }
+}