This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes in repository https://gitbox.apache.org/repos/asf/curator.git
commit 01706e06f899c17c2ce8f1a10de7bc059c25c200 Author: randgalt <randg...@apache.org> AuthorDate: Fri Nov 8 10:53:38 2019 -0500 CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed --- .../framework/recipes/cache/CuratorCacheImpl.java | 15 +- .../framework/recipes/cache/TestCuratorCache.java | 45 ------ .../recipes/cache/TestCuratorCacheEdges.java | 153 +++++++++++++++++++++ 3 files changed, 164 insertions(+), 49 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index ee95570..6de4d06 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -30,6 +30,7 @@ import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; @@ -157,13 +158,18 @@ class CuratorCacheImpl implements CuratorCache } } - private void nodeChildrenChanged(String fromPath) + private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat) { if ( (state.get() != State.STARTED) || !recursive ) { return; } + if ( (oldStat != null) && (oldStat.getCversion() == newStat.getCversion()) ) + { + return; // children haven't changed + } + try { BackgroundCallback callback = (__, event) -> { @@ -203,8 +209,8 @@ class CuratorCacheImpl implements CuratorCache BackgroundCallback callback = (__, event) -> { if ( event.getResultCode() == OK.intValue() ) { - putStorage(new ChildData(event.getPath(), event.getStat(), event.getData())); - nodeChildrenChanged(event.getPath()); + Optional<ChildData> childData = putStorage(new ChildData(event.getPath(), event.getStat(), event.getData())); + checkChildrenChanged(event.getPath(), childData.map(ChildData::getStat).orElse(null), event.getStat()); } else if ( event.getResultCode() == NONODE.intValue() ) { @@ -233,7 +239,7 @@ class CuratorCacheImpl implements CuratorCache } } - private void putStorage(ChildData data) + private Optional<ChildData> putStorage(ChildData data) { Optional<ChildData> previousData = storage.put(data); if ( previousData.isPresent() ) @@ -247,6 +253,7 @@ class CuratorCacheImpl implements CuratorCache { callListeners(l -> l.event(NODE_CREATED, null, data)); } + return previousData; } private void removeStorage(String path) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java index 8560f87..2d6fb0d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java @@ -40,51 +40,6 @@ import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.bu public class TestCuratorCache extends CuratorTestBase { @Test - public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster - { - try (TestingCluster cluster = new TestingCluster(3)) - { - cluster.start(); - - try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) - { - client.start(); - client.create().creatingParentsIfNeeded().forPath("/test"); - - try (CuratorCache cache = CuratorCache.build(client, "/test")) - { - cache.start(); - - CountDownLatch reconnectLatch = new CountDownLatch(1); - client.getConnectionStateListenable().addListener((__, newState) -> { - if ( newState == ConnectionState.RECONNECTED ) - { - reconnectLatch.countDown(); - } - }); - CountDownLatch latch = new CountDownLatch(3); - cache.listenable().addListener((__, ___, ____) -> latch.countDown()); - - client.create().forPath("/test/one"); - client.create().forPath("/test/two"); - client.create().forPath("/test/three"); - - Assert.assertTrue(timing.awaitLatch(latch)); - - InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); - cluster.killServer(connectionInstance); - - Assert.assertTrue(timing.awaitLatch(reconnectLatch)); - - timing.sleepABit(); - - Assert.assertEquals(cache.storage().stream().count(), 4); - } - } - } - } - - @Test public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache { CuratorCacheStorage storage = new StandardCuratorCacheStorage(false); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java new file mode 100644 index 0000000..f20f775 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEdges.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.cache; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.CuratorTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE; +import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder; + +@Test(groups = CuratorTestBase.zk36Group) +public class TestCuratorCacheEdges extends CuratorTestBase +{ + @Test + public void testReconnectConsistency() throws Exception + { + final byte[] first = "one".getBytes(); + final byte[] second = "two".getBytes(); + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().forPath("/root", first); + client.create().forPath("/root/1", first); + client.create().forPath("/root/2", first); + client.create().forPath("/root/1/11", first); + client.create().forPath("/root/1/12", first); + client.create().forPath("/root/1/13", first); + client.create().forPath("/root/2/21", first); + client.create().forPath("/root/2/22", first); + + CuratorCacheStorage storage = CuratorCacheStorage.standard(); + try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build()) + { + CountDownLatch latch = new CountDownLatch(1); + cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build()); + cache.start(); + Assert.assertTrue(timing.awaitLatch(latch)); + } + + // we now have a storage loaded with the initial nodes created + + // simulate nodes changing during a partition + + client.delete().forPath("/root/2/21"); + client.delete().forPath("/root/2/22"); + client.delete().forPath("/root/2"); + + client.setData().forPath("/root", second); + client.create().forPath("/root/1/11/111", second); + client.create().forPath("/root/1/11/111/1111", second); + client.create().forPath("/root/1/11/111/1112", second); + client.create().forPath("/root/1/13/131", second); + client.create().forPath("/root/1/13/132", second); + client.create().forPath("/root/1/13/132/1321", second); + + try (CuratorCache cache = CuratorCache.builder(client, "/root").withStorage(storage).withOptions(DO_NOT_CLEAR_ON_CLOSE).build()) + { + CountDownLatch latch = new CountDownLatch(1); + cache.listenable().addListener(CuratorCacheListener.builder().forInitialized(latch::countDown).build()); + cache.start(); + Assert.assertTrue(timing.awaitLatch(latch)); + } + + Assert.assertEquals(storage.size(), 11); + Assert.assertEquals(storage.get("/root").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/11").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/11/111").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/11/111/1111").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/11/111/1112").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/12").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/13").map(ChildData::getData).orElse(null), first); + Assert.assertEquals(storage.get("/root/1/13/131").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/13/132").map(ChildData::getData).orElse(null), second); + Assert.assertEquals(storage.get("/root/1/13/132/1321").map(ChildData::getData).orElse(null), second); + } + } + + @Test + public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster + { + try (TestingCluster cluster = new TestingCluster(3)) + { + cluster.start(); + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) + { + client.start(); + client.create().creatingParentsIfNeeded().forPath("/test"); + + try (CuratorCache cache = CuratorCache.build(client, "/test")) + { + cache.start(); + + CountDownLatch reconnectLatch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener((__, newState) -> { + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectLatch.countDown(); + } + }); + CountDownLatch latch = new CountDownLatch(3); + cache.listenable().addListener((__, ___, ____) -> latch.countDown()); + + client.create().forPath("/test/one"); + client.create().forPath("/test/two"); + client.create().forPath("/test/three"); + + Assert.assertTrue(timing.awaitLatch(latch)); + + InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); + cluster.killServer(connectionInstance); + + Assert.assertTrue(timing.awaitLatch(reconnectLatch)); + + timing.sleepABit(); + + Assert.assertEquals(cache.storage().stream().count(), 4); + } + } + } + } +}