Repository: curator Updated Branches: refs/heads/CURATOR-388 [created] 21b087784
PathChildrenCache stops working if container node is auto-removed and later recreated - test and a fix Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/21b08778 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/21b08778 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/21b08778 Branch: refs/heads/CURATOR-388 Commit: 21b0877842d12c61f5c8c7f886d596809e12a628 Parents: d502dde Author: randgalt <[email protected]> Authored: Wed Dec 20 15:39:37 2017 -0500 Committer: randgalt <[email protected]> Committed: Wed Dec 20 15:39:37 2017 -0500 ---------------------------------------------------------------------- .../recipes/cache/PathChildrenCache.java | 17 ++++++- .../recipes/cache/TestPathChildrenCache.java | 50 +++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/21b08778/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index c5449f2..f2744f9 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -479,7 +479,8 @@ public class PathChildrenCache implements Closeable { STANDARD, FORCE_GET_DATA_AND_STAT, - POST_INITIALIZED + POST_INITIALIZED, + NO_NODE_EXCEPTION } void refresh(final RefreshMode mode) throws Exception @@ -500,6 +501,20 @@ public class PathChildrenCache implements Closeable { processChildren(event.getChildren(), mode); } + else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + if ( mode == RefreshMode.NO_NODE_EXCEPTION ) + { + log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing."); + ensureContainers.reset(); + } + else + { + log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers"); + ensureContainers.reset(); + offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION)); + } + } } }; http://git-wip-us.apache.org/repos/asf/curator/blob/21b08778/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index 14074f0..da294c8 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.cache; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.UnhandledErrorListener; @@ -29,9 +30,9 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; -import org.apache.curator.test.compatibility.KillSession2; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.curator.test.compatibility.KillSession2; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -47,6 +48,53 @@ import static org.testng.AssertJUnit.assertNotNull; public class TestPathChildrenCache extends BaseClassForTests { @Test + public void testParentContainerMissing() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + PathChildrenCache cache = new PathChildrenCache(client, "/a/b/test", true); + try + { + client.start(); + + final BlockingQueue<PathChildrenCacheEvent.Type> events = Queues.newLinkedBlockingQueue(); + PathChildrenCacheListener listener = new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + events.add(event.getType()); + } + }; + cache.getListenable().addListener(listener); + cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.INITIALIZED); + + client.create().forPath("/a/b/test/one"); + client.create().forPath("/a/b/test/two"); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + + client.delete().forPath("/a/b/test/one"); + client.delete().forPath("/a/b/test/two"); + client.delete().forPath("/a/b/test"); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + + timing.sleepABit(); + + client.create().creatingParentContainersIfNeeded().forPath("/a/b/test/new"); + Assert.assertEquals(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + } + finally + { + CloseableUtils.closeQuietly(cache); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testWithBadConnect() throws Exception { final int serverPort = server.getPort();
