Trying to make tests more reliable
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b25a8a35 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b25a8a35 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b25a8a35 Branch: refs/heads/CURATOR-3.0 Commit: b25a8a35856abf9710d42fae0a7324fbe66c362d Parents: 967faf1 Author: randgalt <[email protected]> Authored: Sat Oct 10 15:15:50 2015 -0500 Committer: randgalt <[email protected]> Committed: Sat Oct 10 15:15:50 2015 -0500 ---------------------------------------------------------------------- .../recipes/cache/PathChildrenCache.java | 6 +- .../framework/recipes/cache/TreeCache.java | 21 +- .../recipes/cache/TestPathChildrenCache.java | 432 +++++++++---------- 3 files changed, 220 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 99a652d..e4e18d9 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -315,7 +315,7 @@ public class PathChildrenCache implements Closeable */ public void rebuild() throws Exception { - Preconditions.checkState(!executorService.isShutdown(), "cache has been closed"); + Preconditions.checkState(state.get() == State.STARTED, "cache has been closed"); ensurePath(); @@ -347,7 +347,7 @@ public class PathChildrenCache implements Closeable public void rebuildNode(String fullPath) throws Exception { Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath); - Preconditions.checkState(!executorService.isShutdown(), "cache has been closed"); + Preconditions.checkState(state.get() == State.STARTED, "cache has been closed"); ensurePath(); internalRebuildNode(fullPath); @@ -370,8 +370,6 @@ public class PathChildrenCache implements Closeable client.getConnectionStateListenable().removeListener(connectionStateListener); listeners.clear(); executorService.close(); - client.clearWatcherReferences(childrenWatcher); - client.clearWatcherReferences(dataWatcher); client.removeWatchers(); // TODO http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index bda00bf..8030e8b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -248,18 +248,24 @@ public class TreeCache implements Closeable private void doRefreshChildren() throws Exception { - client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + if ( treeState.get() == TreeState.STARTED ) + { + client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + } } private void doRefreshData() throws Exception { - if ( dataIsCompressed ) - { - client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); - } - else + if ( treeState.get() == TreeState.STARTED ) { - client.getData().usingWatcher(this).inBackground(this).forPath(path); + if ( dataIsCompressed ) + { + client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); + } + else + { + client.getData().usingWatcher(this).inBackground(this).forPath(path); + } } } @@ -285,7 +291,6 @@ public class TreeCache implements Closeable { stat.set(null); data.set(null); - client.clearWatcherReferences(this); ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null); if ( childMap != null ) { http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index 3571ca7..a4e2b2e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -18,15 +18,9 @@ */ package org.apache.curator.framework.recipes.cache; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.Pathable; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.retry.RetryOneTime; @@ -35,31 +29,12 @@ import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.KillSession; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; -import org.apache.log4j.Appender; -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.SimpleLayout; -import org.apache.log4j.spi.LoggingEvent; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.testng.Assert; import org.testng.annotations.Test; - -import java.util.Collection; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Exchanger; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -348,48 +323,48 @@ public class TestPathChildrenCache extends BaseClassForTests final CountDownLatch removedLatch = new CountDownLatch(1); final CountDownLatch postRemovedLatch = new CountDownLatch(1); final CountDownLatch dataLatch = new CountDownLatch(1); - PathChildrenCache cache = new PathChildrenCache(client, "/test", true); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) ) + { + cache.getListenable().addListener + ( + new PathChildrenCacheListener() { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ) - { - removedLatch.countDown(); - Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS)); - } - else + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - try + if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ) { - Assert.assertEquals(event.getData().getData(), "two".getBytes()); + removedLatch.countDown(); + Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS)); } - finally + else { - dataLatch.countDown(); + try + { + Assert.assertEquals(event.getData().getData(), "two".getBytes()); + } + finally + { + dataLatch.countDown(); + } } } } - } - ); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + ); + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - client.delete().forPath("/test/foo"); - Assert.assertTrue(timing.awaitLatch(removedLatch)); - client.create().forPath("/test/foo", "two".getBytes()); - postRemovedLatch.countDown(); - Assert.assertTrue(timing.awaitLatch(dataLatch)); + client.delete().forPath("/test/foo"); + Assert.assertTrue(timing.awaitLatch(removedLatch)); + client.create().forPath("/test/foo", "two".getBytes()); + postRemovedLatch.countDown(); + Assert.assertTrue(timing.awaitLatch(dataLatch)); - Throwable t = error.get(); - if ( t != null ) - { - Assert.fail("Assert", t); + Throwable t = error.get(); + if ( t != null ) + { + Assert.fail("Assert", t); + } } - - cache.close(); } finally { @@ -411,79 +386,79 @@ public class TestPathChildrenCache extends BaseClassForTests client.create().forPath("/test/snafu", "original".getBytes()); final CountDownLatch addedLatch = new CountDownLatch(2); - final PathChildrenCache cache = new PathChildrenCache(client, "/test", true); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", true) ) + { + cache.getListenable().addListener + ( + new PathChildrenCacheListener() { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - if ( event.getData().getPath().equals("/test/test") ) + if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) { - addedLatch.countDown(); + if ( event.getData().getPath().equals("/test/test") ) + { + addedLatch.countDown(); + } } - } - else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ) - { - if ( event.getData().getPath().equals("/test/snafu") ) + else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ) { - addedLatch.countDown(); + if ( event.getData().getPath().equals("/test/snafu") ) + { + addedLatch.countDown(); + } } } } - } - ); - cache.rebuildTestExchanger = new Exchanger<Object>(); - ExecutorService service = Executors.newSingleThreadExecutor(); - final AtomicReference<String> deletedPath = new AtomicReference<String>(); - Future<Object> future = service.submit - ( - new Callable<Object>() - { - @Override - public Object call() throws Exception + ); + cache.rebuildTestExchanger = new Exchanger<Object>(); + ExecutorService service = Executors.newSingleThreadExecutor(); + final AtomicReference<String> deletedPath = new AtomicReference<String>(); + Future<Object> future = service.submit + ( + new Callable<Object>() { - cache.rebuildTestExchanger.exchange(new Object()); + @Override + public Object call() throws Exception + { + cache.rebuildTestExchanger.exchange(new Object()); - // simulate another process adding a node while we're rebuilding - client.create().forPath("/test/test"); + // simulate another process adding a node while we're rebuilding + client.create().forPath("/test/test"); - List<ChildData> currentData = cache.getCurrentData(); - Assert.assertTrue(currentData.size() > 0); + List<ChildData> currentData = cache.getCurrentData(); + Assert.assertTrue(currentData.size() > 0); - // simulate another process removing a node while we're rebuilding - client.delete().forPath(currentData.get(0).getPath()); - deletedPath.set(currentData.get(0).getPath()); + // simulate another process removing a node while we're rebuilding + client.delete().forPath(currentData.get(0).getPath()); + deletedPath.set(currentData.get(0).getPath()); - cache.rebuildTestExchanger.exchange(new Object()); + cache.rebuildTestExchanger.exchange(new Object()); - ChildData childData = null; - while ( childData == null ) - { - childData = cache.getCurrentData("/test/snafu"); - Thread.sleep(1000); - } - Assert.assertEquals(childData.getData(), "original".getBytes()); - client.setData().forPath("/test/snafu", "grilled".getBytes()); + ChildData childData = null; + while ( childData == null ) + { + childData = cache.getCurrentData("/test/snafu"); + Thread.sleep(1000); + } + Assert.assertEquals(childData.getData(), "original".getBytes()); + client.setData().forPath("/test/snafu", "grilled".getBytes()); - cache.rebuildTestExchanger.exchange(new Object()); + cache.rebuildTestExchanger.exchange(new Object()); - return null; + return null; + } } - } - ); - cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - future.get(); - - Assert.assertTrue(timing.awaitLatch(addedLatch)); - Assert.assertNotNull(cache.getCurrentData("/test/test")); - Assert.assertNull(cache.getCurrentData(deletedPath.get())); - Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes()); + ); + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + future.get(); - cache.close(); + Assert.assertTrue(timing.awaitLatch(addedLatch)); + Assert.assertNotNull(cache.getCurrentData("/test/test")); + Assert.assertNull(cache.getCurrentData(deletedPath.get())); + Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes()); + } } finally { @@ -653,7 +628,7 @@ public class TestPathChildrenCache extends BaseClassForTests client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes()); Assert.assertTrue(timing.awaitLatch(childAddedLatch)); - KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString()); + KillSession.kill(client.getZookeeperClient().getZooKeeper()); Assert.assertTrue(timing.awaitLatch(lostLatch)); Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); Assert.assertTrue(timing.awaitLatch(removedLatch)); @@ -695,9 +670,9 @@ public class TestPathChildrenCache extends BaseClassForTests Timing timing = new Timing(); PathChildrenCache cache = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - client.start(); try { + client.start(); client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes()); final CountDownLatch latch = new CountDownLatch(1); @@ -716,7 +691,7 @@ public class TestPathChildrenCache extends BaseClassForTests }; cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - latch.await(); + Assert.assertTrue(timing.awaitLatch(latch)); int saveCounter = counter.get(); client.setData().forPath("/test/one", "alt".getBytes()); @@ -725,6 +700,7 @@ public class TestPathChildrenCache extends BaseClassForTests Assert.assertEquals(saveCounter, counter.get()); semaphore.release(1000); + timing.sleepABit(); } finally { @@ -735,44 +711,43 @@ public class TestPathChildrenCache extends BaseClassForTests private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception { - PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData); - - final CountDownLatch latch = new CountDownLatch(2); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) ) + { + final CountDownLatch latch = new CountDownLatch(2); + cache.getListenable().addListener + ( + new PathChildrenCacheListener() { - if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - latch.countDown(); + if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ) + { + latch.countDown(); + } } } - } - ); - cache.start(); + ); + cache.start(); - client.create().forPath("/test/one", "one".getBytes()); - client.create().forPath("/test/two", "two".getBytes()); - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + client.create().forPath("/test/one", "one".getBytes()); + client.create().forPath("/test/two", "two".getBytes()); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); - for ( ChildData data : cache.getCurrentData() ) - { - if ( cacheData ) - { - Assert.assertNotNull(data.getData()); - Assert.assertNotNull(data.getStat()); - } - else + for ( ChildData data : cache.getCurrentData() ) { - Assert.assertNull(data.getData()); - Assert.assertNotNull(data.getStat()); + if ( cacheData ) + { + Assert.assertNotNull(data.getData()); + Assert.assertNotNull(data.getStat()); + } + else + { + Assert.assertNull(data.getData()); + Assert.assertNotNull(data.getStat()); + } } } - - cache.close(); } @Test @@ -786,34 +761,34 @@ public class TestPathChildrenCache extends BaseClassForTests client.create().forPath("/test"); final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); - PathChildrenCache cache = new PathChildrenCache(client, "/test", true); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) ) + { + cache.getListenable().addListener + ( + new PathChildrenCacheListener() { - if ( event.getData().getPath().equals("/test/one") ) + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - events.offer(event.getType()); + if ( event.getData().getPath().equals("/test/one") ) + { + events.offer(event.getType()); + } } } - } - ); - cache.start(); - - client.create().forPath("/test/one", "hey there".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + ); + cache.start(); - client.setData().forPath("/test/one", "sup!".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); - Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); + client.create().forPath("/test/one", "hey there".getBytes()); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); - client.delete().forPath("/test/one"); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + client.setData().forPath("/test/one", "sup!".getBytes()); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); - cache.close(); + client.delete().forPath("/test/one"); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + } } finally { @@ -833,56 +808,58 @@ public class TestPathChildrenCache extends BaseClassForTests final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); final ExecutorService exec = Executors.newSingleThreadExecutor(); - PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec); - cache.getListenable().addListener - ( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) ) + { + cache.getListenable().addListener + ( + new PathChildrenCacheListener() { - if ( event.getData().getPath().equals("/test/one") ) + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - events.offer(event.getType()); + if ( event.getData().getPath().equals("/test/one") ) + { + events.offer(event.getType()); + } } } - } - ); - cache.start(); + ); + cache.start(); - final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); - PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec); - cache2.getListenable().addListener( - new PathChildrenCacheListener() { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) - throws Exception + final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>(); + try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) ) + { + cache2.getListenable().addListener( + new PathChildrenCacheListener() { - if ( event.getData().getPath().equals("/test/one") ) + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) + throws Exception { - events2.offer(event.getType()); + if ( event.getData().getPath().equals("/test/one") ) + { + events2.offer(event.getType()); + } } } - } - ); - cache2.start(); - - client.create().forPath("/test/one", "hey there".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); - Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); - - client.setData().forPath("/test/one", "sup!".getBytes()); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); - Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); - Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); - Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!"); - - client.delete().forPath("/test/one"); - Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); - Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); - - cache.close(); - cache2.close(); + ); + cache2.start(); + + client.create().forPath("/test/one", "hey there".getBytes()); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + + client.setData().forPath("/test/one", "sup!".getBytes()); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); + Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); + Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!"); + + client.delete().forPath("/test/one"); + Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED); + } + } } finally { @@ -902,17 +879,17 @@ public class TestPathChildrenCache extends BaseClassForTests client.create().forPath("/test"); final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor()); - PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec); - - cache.start(); - client.create().forPath("/test/one", "hey there".getBytes()); + try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) ) + { + cache.start(); + client.create().forPath("/test/one", "hey there".getBytes()); - cache.rebuild(); - Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); - Assert.assertTrue(exec.isExecuteCalled()); + cache.rebuild(); + Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); + Assert.assertTrue(exec.isExecuteCalled()); - exec.setExecuteCalled(false); - cache.close(); + exec.setExecuteCalled(false); + } Assert.assertFalse(exec.isExecuteCalled()); client.delete().forPath("/test/one"); @@ -940,28 +917,29 @@ public class TestPathChildrenCache extends BaseClassForTests try { final CountDownLatch latch = new CountDownLatch(1); - final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) { + try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) { @Override protected void handleException(Throwable e) { latch.countDown(); } - }; - cache.start(); - - cache.offerOperation(new Operation() + } ) { + cache.start(); - @Override - public void invoke() throws Exception + cache.offerOperation(new Operation() { - Thread.sleep(5000); - } - }); - Thread.sleep(1000); + @Override + public void invoke() throws Exception + { + Thread.sleep(5000); + } + }); - cache.close(); + Thread.sleep(1000); + + } latch.await(5, TimeUnit.SECONDS);
