This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit ed9bc9e0fcabfd1b1f764cdfef81cbc8b8e38820 Author: xyuanlu <[email protected]> AuthorDate: Tue Apr 25 21:03:42 2023 -0700 Add a Trie class to represent RecursivePersistWatcherListener in ZkClient (#2439) ZkPathRecursiveWatcherTrie will be used for a registry for ZK persist recursive watcher. When persist recursive watcher is registered on path /X, ZK will send out data change for any data/child changing under the tree structure of /X. The event only include the path of changed ZNode. In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path and notify all registered recursive persist listener and notify them about the node change. ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive --- .../zkclient/RecursivePersistListener.java | 41 +++++ .../zkclient/util/ZkPathRecursiveWatcherTrie.java | 169 +++++++++++++++++++++ .../util/TestZkPathRecursiveWatcherTrie.java | 104 +++++++++++++ 3 files changed, 314 insertions(+) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java new file mode 100644 index 000000000..96c09b340 --- /dev/null +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/RecursivePersistListener.java @@ -0,0 +1,41 @@ +package org.apache.helix.zookeeper.zkclient; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.zookeeper.Watcher; + +/** + * An {@link RecursivePersistListener} can be registered at a {@link ZkClient} for listening on all + * zk children changes in a tree structure for a given path. + * + * The listener is a persist listener. No need to resubscribe. + * + */ +public interface RecursivePersistListener { + /** + * invoked when there is a node added, removed or node data change in the tree structure of + * that RecursivePersistListener subscribed path + * @param dataPath The path of ZNode that change happened + * @param eventType Event type, including NodeCreated, NodeDataChanged and NodeDeleted + * @throws Exception + */ + public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) + throws Exception; +} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java new file mode 100644 index 000000000..6bdf0cceb --- /dev/null +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java @@ -0,0 +1,169 @@ +package org.apache.helix.zookeeper.zkclient.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; + + +/** + * ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers. + * When persist recursive watcher is registered on path /X, ZK will send out data change for any + * data/child changing under the tree structure of /X. The event only include the path of changed + * ZNode. + * In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path + * and notify all registered recursive persist listener and notify them about the node change. + * ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive + */ +public class ZkPathRecursiveWatcherTrie { + + /** Root node of PathTrie */ + private final TrieNode _rootNode; + + static class TrieNode { + + final String _value; // Segmented ZNode path at current level + // A map of segmented ZNode path of next level to TrieNode, We keep the same initial + // children size as Zk server + final Map<String, TrieNode> _children = new HashMap<>(4); + // A list of recursive persist watcher on the current path + Set<RecursivePersistListener> _recursiveListeners = new HashSet<>(4); + + /** + * Create a trie node with parent as parameter. + * + * @param value the value stored in this node + */ + private TrieNode(String value) { + _value = value; + } + + /** + * The value stored in this node. + * + * @return the value stored in this node + */ + public String getValue() { + return this._value; + } + + /** + * Add a child to the existing node. + * + * @param childName the string name of the child + * @param node the node that is the child + */ + void addChild(String childName, TrieNode node) { + this._children.putIfAbsent(childName, node); + } + + /** + * Return the child of a node mapping to the input child name. + * + * @param childName the name of the child + * @return the child of a node + */ + @VisibleForTesting + TrieNode getChild(String childName) { + return this._children.get(childName); + } + + /** + * Get the list of children of this trienode. + * + * @return A collection containing the node's children + */ + @VisibleForTesting + Map<String, TrieNode> getChildren() { + return _children; + } + + /** + * Get the set of RecursivePersistWatcherListener + * Returns an empty set if no listener is registered on the path + * @return + */ + @VisibleForTesting + Set<RecursivePersistListener> getRecursiveListeners() { + return _recursiveListeners; + } + + @Override + public String toString() { + return "TrieNode [name=" + _value + ", children=" + _children.keySet() + "]"; + } + } + + /** + * Construct a new PathTrie with a root node. + */ + public ZkPathRecursiveWatcherTrie() { + this._rootNode = new TrieNode( "/"); + } + + /** + * Add a path to the path trie. All paths are relative to the root node. + * + * @param path the path to add RecursivePersistListener + * @param listener the RecursivePersistListener to be added + */ + public void addRecursiveListener(final String path, RecursivePersistListener listener) { + Objects.requireNonNull(path, "Path cannot be null"); + + if (path.isEmpty()) { + throw new IllegalArgumentException("Empty path: " + path); + } + final List<String> pathComponents = split(path); + + synchronized(this) { + TrieNode parent = _rootNode; + for (final String part : pathComponents) { + parent = parent.getChildren().computeIfAbsent(part, (p)-> new TrieNode(part) ); + } + parent._recursiveListeners.add(listener); + } + } + + /** + * Clear all nodes in the trie. + */ + public synchronized void clear() { + _rootNode.getChildren().clear(); + } + + private static List<String> split(final String path) { + return Stream.of(path.split("/")).filter(t -> !t.trim().isEmpty()).collect(Collectors.toList()); + } + + // only for test + @VisibleForTesting + TrieNode getRootNode() { + return _rootNode; + } +} diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java new file mode 100644 index 000000000..83d215cf6 --- /dev/null +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/util/TestZkPathRecursiveWatcherTrie.java @@ -0,0 +1,104 @@ +package org.apache.helix.zookeeper.zkclient.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; + +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; + + +public class TestZkPathRecursiveWatcherTrie { + ZkPathRecursiveWatcherTrie _recursiveWatcherTrie = new ZkPathRecursiveWatcherTrie(); + + /** + * test case create a tire tree of following structure. '*' means how many listener is added + * on that path. + * [x] mean the listener is removed as step 'x' indicated in the test comment. + * + * "/" + * a + * / | \ + * b* b2* [2] b3 + * / \ + * c c + * / / | \ \ \ + * d* d1* d2* d3* d4* d + * \ + * e + * \ + * f** [1] [3] + * + */ + @org.testng.annotations.Test + public void testAddRemoveWatcher() { + System.out.println("START testAddRemoveWatcher at " + new Date(System.currentTimeMillis())); + _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d", new Test()); + _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d1", new Test()); + _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d2", new Test()); + _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d3", new Test()); + _recursiveWatcherTrie.addRecursiveListener("/a/b/c/d4", new Test()); + + Test listenerOnb = new Test(); + _recursiveWatcherTrie.addRecursiveListener("/a/b", listenerOnb); + Test listenerOnb2 = new Test(); + _recursiveWatcherTrie.addRecursiveListener("/a/b2", listenerOnb2); + Test listenerOnf_1 = new Test(); + _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); + Test listenerOnf_2 = new Test(); + _recursiveWatcherTrie.addRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2); + + // node f should have 2 listeners + Assert.assertEquals( + _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d") + .getChild("e").getChild("f").getRecursiveListeners().size(), 2); + + /* _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // step [1] + _recursiveWatcherTrie.removeRecursiveListener("/a/b2", listenerOnb2); // step[2] + //b2 will be removed. node "a" should have 2 children, b and b3. + Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 2); + Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b3")); + Assert.assertTrue(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().contains("b")); + // path "/a/b3/c/d/e/f still exists with end node "f" has one listener + Assert.assertEquals( + _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d") + .getChild("e").getChildren().size(), 1); + Assert.assertEquals( + _recursiveWatcherTrie.getRootNode().getChild("a").getChild("b3").getChild("c").getChild("d") + .getChild("e").getChild("f").getRecursiveListeners().size(), 1); + + // removing all listeners of /a/b3/c/d/e/f. + _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_1); // test no op + _recursiveWatcherTrie.removeRecursiveListener("/a/b3/c/d/e/f", listenerOnf_2); + // b3 should be removed as well as all children nodes of b3 + Assert.assertEquals(_recursiveWatcherTrie.getRootNode().getChild("a").getChildren().size(), 1); + */ + } + + class Test implements RecursivePersistListener { + + @Override + public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) + throws Exception { + + } + } +}
