This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes in repository https://gitbox.apache.org/repos/asf/curator.git
commit 97f5d6e26f47b954bb69a79eb2a6a2838ed8e84b Author: randgalt <[email protected]> AuthorDate: Fri Nov 8 10:53:38 2019 -0500 CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed --- .../framework/recipes/cache/CuratorCacheImpl.java | 17 ++- .../framework/recipes/watch/PersistentWatcher.java | 10 +- .../src/site/confluence/index.confluence | 3 +- .../site/confluence/persistent-watcher.confluence | 35 +++++ .../framework/recipes/cache/TestCuratorCache.java | 45 ------ .../recipes/cache/TestCuratorCacheEdges.java | 153 +++++++++++++++++++++ .../recipes/watch/TestPersistentWatcher.java | 1 + 7 files changed, 208 insertions(+), 56 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index ee95570..e6be71c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; @@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache } } - private void nodeChildrenChanged(String fromPath) + private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat) { if ( (state.get() != State.STARTED) || !recursive ) { return; } + if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) ) + { + return; // children haven't changed + } + try { BackgroundCallback callback = (__, event) -> { @@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache BackgroundCallback callback = (__, event) -> { if ( event.getResultCode() == OK.intValue() ) { - putStorage(new ChildData(event.getPath(), event.getStat(), event.getData())); - nodeChildrenChanged(event.getPath()); + Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData())); + checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat()); } else if ( event.getResultCode() == NONODE.intValue() ) { @@ -233,12 +239,12 @@ class CuratorCacheImpl implements CuratorCache } } - private void putStorage(ChildData data) + private Optional<ChildData> putStorage(ChildData data) { Optional<ChildData> previousData = storage.put(data); if ( previousData.isPresent() ) { - if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() ) + if ( previousData.get().getStat().getVersion() != data.getStat().getVersion() ) { callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data)); } @@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache { callListeners(l -> l.event(NODE_CREATED, null, data)); } + return previousData; } private void removeStorage(String path) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index 87ecb6e..187343a 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -114,7 +114,7 @@ public class PersistentWatcher implements Closeable client.getConnectionStateListenable().removeListener(connectionStateListener); try { - client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath); + client.watchers().remove(watcher).guaranteed().inBackground().forPath(basePath); } catch ( Exception e ) { @@ -140,7 +140,7 @@ public class PersistentWatcher implements Closeable * * @return listener container */ - public StandardListenerManager<Runnable> getResetListenable() + public Listenable<Runnable> getResetListenable() { return resetListeners; } @@ -150,13 +150,13 @@ public class PersistentWatcher implements Closeable try { BackgroundCallback callback = (__, event) -> { - if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - reset(); + resetListeners.forEach(Runnable::run); } else { - resetListeners.forEach(Runnable::run); + reset(); } }; client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath); diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence index d96b5ce..ab8dc53 100644 --- a/curator-recipes/src/site/confluence/index.confluence +++ b/curator-recipes/src/site/confluence/index.confluence @@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths". |[[Node Cache|node-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.| |[[Tree Cache|tree-cache.html]] \- (For pre-ZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.| -||Nodes|| +||Nodes/Watches|| +|[[Persistent Recursive Watcher|persistent-watcher.html]] \- A managed persistent recursive watcher. The watch will be managed such that it stays set through connection lapses, etc.| |[[Persistent Node|persistent-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.| |[[Persistent TTL Node|persistent-ttl-node.html]] \- Useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data.| |[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.| diff --git a/curator-recipes/src/site/confluence/persistent-watcher.confluence b/curator-recipes/src/site/confluence/persistent-watcher.confluence new file mode 100644 index 0000000..4551669 --- /dev/null +++ b/curator-recipes/src/site/confluence/persistent-watcher.confluence @@ -0,0 +1,35 @@ +h1. Persistent Recursive Watcher + +*Note: * PersistentWatcher requires ZooKeeper 3.6\+. + +h2. Description +A managed persistent persistent watcher. The watch will be managed such that it stays set through connection lapses, etc. + +h2. Participating Classes +* PersistentWatcher + +h2. Usage +h3. Creating a PersistentWatcher +{code} +public PersistentWatcher(CuratorFramework client, + String basePath, + boolean recursive) + +Parameters: +client - the client +basePath - path to set the watch on +recursive - ZooKeeper persistent watches can optionally be recursive +{code} + +h2. General Usage +The instance must be started by calling {{start()}}. Call {{close()}} when you want to remove the watch. + +PersistentWatcher presents two listener types: + +* {{Listenable<Watcher> getListenable()}} \- Use this to add watchers. These will behave in the same manner that watchers added +via {{ZooKeeper.addWatch()}} behave. +* {{Listenable<Runnable> getResetListenable()}} \- The Runnables added with this get called once the Persistent Watcher has been successfully set +(or reset after a connection partition). + +h2. Error Handling +PersistentWatcher instances internally monitor connection losses, etc. automatically resetting on reconnection. diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java index 8560f87..2d6fb0d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java @@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu public class TestCuratorCache extends CuratorTestBase { @Test - public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster - { - try (TestingCluster cluster = new TestingCluster(3)) - { - cluster.start(); - - try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) - { - client.start(); - client.create().creatingParentsIfNeeded().forPath("/test"); - - try (CuratorCache cache = CuratorCache.build(client, "/test")) - { - cache.start(); - - CountDownLatch reconnectLatch = new CountDownLatch(1); - client.getConnectionStateListenable().addListener((__, newState) -> { - if ( newState == ConnectionState.RECONNECTED ) - { - reconnectLatch.countDown(); - } - }); - CountDownLatch latch = new CountDownLatch(3); - cache.listenable().addListener((__, ___, ____) -> latch.countDown()); - - client.create().forPath("/test/one"); - client.create().forPath("/test/two"); - client.create().forPath("/test/three"); - - Assert.assertTrue(timing.awaitLatch(latch)); - - InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); - cluster.killServer(connectionInstance); - - Assert.assertTrue(timing.awaitLatch(reconnectLatch)); - - timing.sleepABit(); - - Assert.assertEquals(cache.storage().stream().count(), 4); - } - } - } - } - - @Test public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache { CuratorCacheStorage storage = new StandardCuratorCacheStorage(false); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java new file mode 100644 index 0000000..f20f775 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder; + +@Test(groups = CuratorTestBase.zk36Group) +public class TestCuratorCacheEdges extends CuratorTestBase +{ + @Test + public void testReconnectConsistency() throws Exception + { + final byte[] first = "one".getBytes(); + final byte[] second = "two".getBytes(); + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().forPath("/root", first); + client.create().forPath("/root/1", first); + client.create().forPath("/root/2", first); + client.create().forPath("/root/1/11", first); + client.create().forPath("/root/1/12", first); + client.create().forPath("/root/1/13", first); + client.create().forPath("/root/2/21", first); + client.create().forPath("/root/2/22", first); + + CuratorCacheStorage storage = CuratorCacheStorage.standard(); + try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build()) + { + CountDownLatch latch = new CountDownLatch(1); + cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build()); + cache.start(); + Assert.assertTrue(timing.awaitLatch(latch)); + } + + // we now have a storage loaded with the initial nodes created + + // simulate nodes changing during a partition + + client.delete().forPath("/root/2/21"); + client.delete().forPath("/root/2/22"); + client.delete().forPath("/root/2"); + + client.setData().forPath("/root", second); + client.create().forPath("/root/1/11/111", second); + client.create().forPath("/root/1/11/111/1111", second); + client.create().forPath("/root/1/11/111/1112", second); + client.create().forPath("/root/1/13/131", second); + client.create().forPath("/root/1/13/132", second); + client.create().forPath("/root/1/13/132/1321", second); + + try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build()) + { + CountDownLatch latch = new CountDownLatch(1); + cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build()); + cache.start(); + Assert.assertTrue(timing.awaitLatch(latch)); + } + + Assert.assertEquals(storage.size(), 11); + Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second); + } + } + + @Test + public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster + { + try (TestingCluster cluster = new TestingCluster(3)) + { + cluster.start(); + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test"); + + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + cache.start(); + + CountDownLatch reconnectLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectLatch.countDown(); + } + }); + CountDownLatch latch = new CountDownLatch(3); + cache.listenable().addListener((__, ___, ____) -> latch.countDown()); + + client.create().forPath("/test/one"); + client.create().forPath("/test/two"); + client.create().forPath("/test/three"); + + Assert.assertTrue(timing.awaitLatch(latch)); + + InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); + cluster.killServer(connectionInstance); + + Assert.assertTrue(timing.awaitLatch(reconnectLatch)); + + timing.sleepABit(); + + Assert.assertEquals(cache.storage().stream().count(), 4); + } + } + } + } +} diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java index 534c365..1cf7eb0 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +@Test(groups = CuratorTestBase.zk36Group) public class TestPersistentWatcher extends CuratorTestBase { @Test
