Repository: curator Updated Branches: refs/heads/master 115346bf5 -> 2f33fafbc
PathChildrenCache was not handling CONNECTED state Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2f33fafb Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2f33fafb Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2f33fafb Branch: refs/heads/master Commit: 2f33fafbc58303009d2f78a5ed0df715a799a3c9 Parents: 115346b Author: randgalt <[email protected]> Authored: Sat May 21 12:15:10 2016 -0500 Committer: randgalt <[email protected]> Committed: Sat May 21 12:15:10 2016 -0500 ---------------------------------------------------------------------- .../recipes/cache/PathChildrenCache.java | 1 + .../recipes/cache/TestPathChildrenCache.java | 166 ++++++++++++------- 2 files changed, 106 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/2f33fafb/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index ae30da9..568d03d 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -640,6 +640,7 @@ public class PathChildrenCache implements Closeable break; } + case CONNECTED: case RECONNECTED: { try http://git-wip-us.apache.org/repos/asf/curator/blob/2f33fafb/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index 14d061f..887df54 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -16,56 +16,96 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.recipes.cache; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.Pathable; import org.apache.curator.framework.api.UnhandledErrorListener; -import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.KillSession; +import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; -import org.apache.log4j.Appender; -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.SimpleLayout; -import org.apache.log4j.spi.LoggingEvent; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.testng.Assert; import org.testng.annotations.Test; - -import java.util.Collection; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Exchanger; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.testng.AssertJUnit.assertNotNull; + public class TestPathChildrenCache extends BaseClassForTests { @Test + public void testWithBadConnect() throws Exception + { + final int serverPort = server.getPort(); + server.close(); + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1)); + try + { + client.start(); + + final CountDownLatch ensurePathLatch = new CountDownLatch(1); + PathChildrenCache cache = new PathChildrenCache(client, "/", true) + { + @Override + protected void ensurePath() throws Exception + { + try + { + super.ensurePath(); + } + catch ( Exception e ) + { + ensurePathLatch.countDown(); + throw e; + } + } + }; + final CountDownLatch addedLatch = new CountDownLatch(1); + PathChildrenCacheListener listener = new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) + { + addedLatch.countDown(); + } + } + }; + cache.getListenable().addListener(listener); + cache.start(); + Assert.assertTrue(timing.awaitLatch(ensurePathLatch)); + + server = new TestingServer(serverPort, true); + + client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3}); + + assertNotNull("/baz does not exist", client.checkExists().forPath("/baz")); + + Assert.assertTrue(timing.awaitLatch(addedLatch)); + + assertNotNull("cache doesn't see /baz", cache.getCurrentData("/baz")); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testPostInitializedForEmpty() throws Exception { Timing timing = new Timing(); @@ -78,19 +118,19 @@ public class TestPathChildrenCache extends BaseClassForTests final CountDownLatch latch = new CountDownLatch(1); cache = new PathChildrenCache(client, "/test", true); cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + ( + new PathChildrenCacheListener() { - if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ) + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - latch.countDown(); + if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ) + { + latch.countDown(); + } } } - } - ); + ); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); Assert.assertTrue(timing.awaitLatch(latch)); } @@ -212,20 +252,20 @@ public class TestPathChildrenCache extends BaseClassForTests final CountDownLatch addedLatch = new CountDownLatch(3); cache.getListenable().addListener - ( - new PathChildrenCacheListener() + ( + new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + Assert.assertNotEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED); + if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - Assert.assertNotEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED); - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) - { - addedLatch.countDown(); - } - } + addedLatch.countDown(); } - ); + } + } + ); client.create().forPath("/test/1", "1".getBytes()); client.create().forPath("/test/2", "2".getBytes()); @@ -845,18 +885,19 @@ public class TestPathChildrenCache extends BaseClassForTests final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec); cache2.getListenable().addListener( - new PathChildrenCacheListener() { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) - throws Exception + new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) + throws Exception + { + if ( event.getData().getPath().equals("/test/one") ) { - if ( event.getData().getPath().equals("/test/one") ) - { - events2.offer(event.getType()); - } + events2.offer(event.getType()); } } - ); + } + ); cache2.start(); client.create().forPath("/test/one", "hey there".getBytes()); @@ -884,7 +925,7 @@ public class TestPathChildrenCache extends BaseClassForTests @Test public void testDeleteNodeAfterCloseDoesntCallExecutor() - throws Exception + throws Exception { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); @@ -911,7 +952,8 @@ public class TestPathChildrenCache extends BaseClassForTests timing.sleepABit(); Assert.assertFalse(exec.isExecuteCalled()); } - finally { + finally + { client.close(); } @@ -931,7 +973,8 @@ public class TestPathChildrenCache extends BaseClassForTests try { final CountDownLatch latch = new CountDownLatch(1); - final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) { + final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) + { @Override protected void handleException(Throwable e) { @@ -957,7 +1000,8 @@ public class TestPathChildrenCache extends BaseClassForTests latch.await(5, TimeUnit.SECONDS); Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred"); - } finally + } + finally { CloseableUtils.closeQuietly(client); }
