This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge in repository https://gitbox.apache.org/repos/asf/curator.git
commit 104935e9a58d97cfe5ed7c8bf27a957476ba6b66 Author: randgalt <[email protected]> AuthorDate: Sun Mar 29 15:33:13 2020 -0500 CURATOR-549 The next phase of this issue will implement a bridge cache that bridges TreeCache for pre 3.6 SK and CuratorCache for ZK 3.6+. That bridge will need this TreeCache iterator. --- .../curator/framework/recipes/cache/TreeCache.java | 59 +++++- .../framework/recipes/cache/TreeCacheEvent.java | 20 ++ .../framework/recipes/cache/TreeCacheIterator.java | 101 +++++++++++ .../framework/recipes/cache/BaseTestTreeCache.java | 10 + .../cache/TestTreeCacheIteratorAndSize.java | 201 +++++++++++++++++++++ .../recipes/cache/TestTreeCacheRandomTree.java | 6 +- 6 files changed, 389 insertions(+), 8 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index 3bf804c..e054d8b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -217,7 +218,7 @@ public class TreeCache implements Closeable private static final ChildData DEAD = new ChildData("/", null, null); - private static boolean isLive(ChildData cd) + static boolean isLive(ChildData cd) { return cd != null && cd != DEAD; } @@ -226,7 +227,7 @@ public class TreeCache implements Closeable private static final AtomicReferenceFieldUpdater<TreeNode, ConcurrentMap<String, TreeNode>> childrenUpdater = (AtomicReferenceFieldUpdater)AtomicReferenceFieldUpdater.newUpdater(TreeNode.class, ConcurrentMap.class, "children"); - private final class TreeNode implements Watcher, BackgroundCallback + final class TreeNode implements Watcher, BackgroundCallback { volatile ChildData childData; final TreeNode parent; @@ -343,7 +344,7 @@ public class TreeCache implements Closeable if ( isLive(oldChildData) ) { - publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData); + publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData, null); } if ( parent == null ) @@ -482,7 +483,14 @@ public class TreeCache implements Closeable } if ( childDataUpdater.compareAndSet(this, oldChildData, toUpdate) ) { - publishEvent(isLive(oldChildData) ? TreeCacheEvent.Type.NODE_UPDATED : TreeCacheEvent.Type.NODE_ADDED, toPublish); + if ( isLive(oldChildData) ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish, oldChildData); + } + else + { + publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish, null); + } break; } } @@ -750,6 +758,45 @@ public class TreeCache implements Closeable return isLive(result) ? result : null; } + /** + * Return an iterator over all nodes in the cache. There are no + * guarantees of accuracy; this is merely the most recent view of the data. + * + * @return a possibly-empty iterator of nodes in the cache + */ + public Iterator<ChildData> iterator() + { + return new TreeCacheIterator(root); + } + + /** + * Return the number of nodes in the cache. There are no + * guarantees of accuracy; this is merely the most recent view of the data. + * + * @return size + */ + public int size() + { + return size(root); + } + + private int size(TreeNode node) + { + int size = 0; + if ( isLive(node.childData) ) + { + ++size; + if ( node.children != null ) + { + for ( TreeNode child : node.children.values() ) + { + size += size(child); + } + } + } + return size; + } + private void callListeners(final TreeCacheEvent event) { listeners.forEach(listener -> @@ -837,9 +884,9 @@ public class TreeCache implements Closeable publishEvent(new TreeCacheEvent(type, null)); } - private void publishEvent(TreeCacheEvent.Type type, ChildData data) + private void publishEvent(TreeCacheEvent.Type type, ChildData data, ChildData oldData) { - publishEvent(new TreeCacheEvent(type, data)); + publishEvent(new TreeCacheEvent(type, data, oldData)); } private void publishEvent(final TreeCacheEvent event) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java index b151037..012b280 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheEvent.java @@ -26,6 +26,7 @@ public class TreeCacheEvent { private final Type type; private final ChildData data; + private final ChildData oldData; /** * Type of change @@ -113,8 +114,19 @@ public class TreeCacheEvent */ public TreeCacheEvent(Type type, ChildData data) { + this(type, data, null); + } + + /** + * @param type event type + * @param data event data or null + * @param oldData event oldData or null + */ + public TreeCacheEvent(Type type, ChildData data, ChildData oldData) + { this.type = type; this.data = data; + this.oldData = oldData; } /** @@ -133,6 +145,14 @@ public class TreeCacheEvent return data; } + /** + * @return the node's old data when the type is {@link org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type#NODE_UPDATED} + */ + public ChildData getOldData() + { + return oldData; + } + @Override public String toString() { diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java new file mode 100644 index 0000000..eed42dc --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheIterator.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.Iterators; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.NoSuchElementException; + +// depth first iterator over tree cache nodes +class TreeCacheIterator implements Iterator<ChildData> +{ + private final LinkedList<Current> stack = new LinkedList<>(); + private Current current; + + private static class Current + { + final Iterator<TreeCache.TreeNode> iterator; + TreeCache.TreeNode node; + + Current(Iterator<TreeCache.TreeNode> iterator) + { + this.iterator = iterator; + node = iterator.next(); + } + } + + TreeCacheIterator(TreeCache.TreeNode root) + { + current = new Current(Iterators.forArray(root)); + stack.push(current); + } + + @Override + public boolean hasNext() + { + return (current != null) && TreeCache.isLive(current.node.childData); + } + + @Override + public ChildData next() + { + if ( current == null ) + { + throw new NoSuchElementException(); + } + + ChildData result = current.node.childData; // result of next iteration is current node's data + + // set the next node for the next iteration (or note completion) + + do + { + setNext(); + } while ( (current != null) && !TreeCache.isLive(current.node.childData) ); + + return result; + } + + private void setNext() + { + if ( current.node.children != null ) + { + stack.push(current); + current = new Current(current.node.children.values().iterator()); + } + else while ( true ) + { + if ( current.iterator.hasNext() ) + { + current.node = current.iterator.next(); + break; + } + else if ( stack.size() > 0 ) + { + current = stack.pop(); + } + else + { + current = null; // done + break; + } + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java index 246704f..dfeb2ff 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java @@ -193,6 +193,16 @@ public class BaseTestTreeCache extends BaseClassForTests { Assert.assertEquals(event.getData().getData(), expectedData, message); } + + if ( event.getType() == TreeCacheEvent.Type.NODE_UPDATED) + { + Assert.assertNotNull(event.getOldData()); + } + else + { + Assert.assertNull(event.getOldData()); + } + return event; } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java new file mode 100644 index 0000000..2d8dbb3 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheIteratorAndSize.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.cache; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +public class TestTreeCacheIteratorAndSize extends CuratorTestBase +{ + @Test + public void testBasic() throws Exception + { + final String[] nodes = { + "/base/test", + "/base/test/3", + "/base/test/3/0", + "/base/test/3/0/0", + "/base/test/3/0/1", + "/base/test/3/1", + "/base/test/3/1/0", + "/base/test/3/1/1", + "/base/test/3/2", + "/base/test/3/2/0", + "/base/test/3/2/1", + "/base/test/3/2/3", + "/base/test/3/3", + "/base/test/3/3/1", + "/base/test/3/3/3" + }; + + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + String basePath = "/base/test"; + try (TreeCache treeCache = new TreeCache(client, basePath) ) + { + treeCache.start(); + + for ( String node : nodes ) + { + client.create().creatingParentsIfNeeded().forPath(node, node.getBytes()); + } + + timing.sleepABit(); // let the cache settle + + Iterator<ChildData> iterator = treeCache.iterator(); + Map<String, byte[]> iteratorValues = new HashMap<>(); + while ( iterator.hasNext() ) + { + ChildData next = iterator.next(); + iteratorValues.put(next.getPath(), next.getData()); + } + + Assert.assertEquals(iteratorValues.size(), nodes.length); + for ( String node : nodes ) + { + Assert.assertEquals(iteratorValues.get(node), node.getBytes()); + } + + Assert.assertEquals(treeCache.size(), nodes.length); + } + } + } + + @Test + public void testIteratorWithRandomGraph() throws Exception + { + Map<String, String> pathAndData = new HashMap<>(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + int nodeQty = random.nextInt(100, 200); + int maxPerRow = random.nextInt(1, 10); + int maxDepth = random.nextInt(3, 5); + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + String basePath = "/base/test"; + try (TreeCache treeCache = new TreeCache(client, basePath) ) + { + treeCache.start(); + + client.create().creatingParentsIfNeeded().forPath(basePath, "0".getBytes()); + pathAndData.put(basePath, "0"); + + while ( nodeQty-- > 0 ) + { + int thisDepth = random.nextInt(1, maxDepth + 1); + StringBuilder path = new StringBuilder(basePath); + for ( int i = 0; i < thisDepth; ++i ) + { + path.append("/").append(random.nextInt(maxPerRow)); + long value = random.nextLong(); + pathAndData.put(path.toString(), Long.toString(value)); + client.create().orSetData().forPath(path.toString(), Long.toString(value).getBytes()); + } + } + + timing.sleepABit(); // let the cache settle + + Assert.assertEquals(treeCache.size(), pathAndData.size()); + + // at this point we have a cached graph of random nodes with random values + Iterator<ChildData> iterator = treeCache.iterator(); + while ( iterator.hasNext() ) + { + ChildData next = iterator.next(); + Assert.assertTrue(pathAndData.containsKey(next.getPath())); + Assert.assertEquals(pathAndData.get(next.getPath()).getBytes(), next.getData()); + pathAndData.remove(next.getPath()); + } + + Assert.assertEquals(pathAndData.size(), 0); // above loop should have removed all nodes + } + } + } + + @Test + public void testEmptyTree() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) + { + client.start(); + + try (TreeCache treeCache = new TreeCache(client, "/base/test")) + { + treeCache.start(); + + Iterator<ChildData> iterator = treeCache.iterator(); + Assert.assertFalse(iterator.hasNext()); + Assert.assertEquals(treeCache.size(), 0); + } + } + } + + @Test + public void testWithDeletedNodes() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) + { + client.start(); + + try (TreeCache treeCache = new TreeCache(client, "/foo")) + { + treeCache.start(); + + client.create().forPath("/foo"); + client.create().forPath("/foo/a1"); + client.create().forPath("/foo/a2"); + client.create().forPath("/foo/a2/a2.1"); + client.create().forPath("/foo/a2/a2.2"); + client.create().forPath("/foo/a3"); + client.create().forPath("/foo/a3/a3.1"); + client.create().forPath("/foo/a3/a3.2"); + + client.delete().forPath("/foo/a2/a2.2"); + client.delete().forPath("/foo/a3/a3.1"); + + timing.sleepABit(); // let the cache settle + + Iterator<ChildData> iterator = treeCache.iterator(); + Set<String> paths = new HashSet<>(); + while ( iterator.hasNext() ) + { + ChildData next = iterator.next(); + paths.add(next.getPath()); + } + + Assert.assertEquals(paths, Sets.newHashSet("/foo", "/foo/a1", "/foo/a2", "/foo/a2/a2.1", "/foo/a3", "/foo/a3/a3.2")); + Assert.assertEquals(treeCache.size(), 6); + } + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java index 96ce75c..e78bb9e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import static org.testng.Assert.assertNotNull; + public class TestTreeCacheRandomTree extends BaseTestTreeCache { /** @@ -209,7 +211,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache { String path = expectedNode.fullPath; Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path); - Assert.assertNotNull(cacheChildren, path); + assertNotNull(cacheChildren, path); if (withDepth && depth == TEST_DEPTH) { return; @@ -233,7 +235,7 @@ public class TestTreeCacheRandomTree extends BaseTestTreeCache private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode) { String path = expectedNode.fullPath; - Assert.assertNotNull(actualChild, path); + assertNotNull(actualChild, path); Assert.assertEquals(actualChild.getData(), expectedNode.data, path); } }
