CURATOR-161 - Modified to make the watcher type optional, defaulting to 'All'.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/389e0b0d Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/389e0b0d Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/389e0b0d Branch: refs/heads/CURATOR-3.0 Commit: 389e0b0d29899bd35ebd4d7c81302d3d8ee53889 Parents: 198ba68 Author: Cameron McKenzie <came...@unico.com.au> Authored: Mon May 11 13:14:40 2015 +1000 Committer: Cameron McKenzie <came...@unico.com.au> Committed: Mon May 11 13:14:40 2015 +1000 ---------------------------------------------------------------------- .../framework/api/RemoveWatchesType.java | 2 +- .../imps/RemoveWatchesBuilderImpl.java | 2 +- .../framework/imps/TestRemoveWatches.java | 218 ++++++++++++------- 3 files changed, 140 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/389e0b0d/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java index 3c58b7b..1123afd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java @@ -6,7 +6,7 @@ import org.apache.zookeeper.Watcher.WatcherType; * Builder to allow the specification of whether it is acceptable to remove client side watch information * in the case where ZK cannot be contacted. */ -public interface RemoveWatchesType +public interface RemoveWatchesType extends RemoveWatchesLocal { /** http://git-wip-us.apache.org/repos/asf/curator/blob/389e0b0d/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index 4354653..5a34f7d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -35,7 +35,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat { this.client = client; this.watcher = null; - this.watcherType = null; + this.watcherType = WatcherType.Any; this.local = false; this.quietly = false; this.backgrounding = new Backgrounding(); http://git-wip-us.apache.org/repos/asf/curator/blob/389e0b0d/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index 39967c9..414c819 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -1,6 +1,7 @@ package org.apache.curator.framework.imps; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.framework.CuratorFramework; @@ -53,7 +54,7 @@ public class TestRemoveWatches extends BaseClassForTests client.checkExists().watched().forPath(path); - client.watches().removeAll().ofType(WatcherType.Data).forPath(path); + client.watches().removeAll().forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -92,7 +93,7 @@ public class TestRemoveWatches extends BaseClassForTests client.checkExists().usingWatcher(watcher).forPath(path); - client.watches().remove(watcher).ofType(WatcherType.Data).forPath(path); + client.watches().remove(watcher).forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -117,20 +118,11 @@ public class TestRemoveWatches extends BaseClassForTests final CountDownLatch removedLatch = new CountDownLatch(1); final String path = "/"; - Watcher watcher = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { - removedLatch.countDown(); - } - } - }; + Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - client.watches().remove(watcher).ofType(WatcherType.Data).forPath(path); + client.watches().remove(watcher).forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -155,16 +147,7 @@ public class TestRemoveWatches extends BaseClassForTests //Make sure that the event fires on both the watcher and the callback. final CountDownLatch removedLatch = new CountDownLatch(2); final String path = "/"; - Watcher watcher = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { - removedLatch.countDown(); - } - } - }; + Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); BackgroundCallback callback = new BackgroundCallback() { @@ -207,20 +190,11 @@ public class TestRemoveWatches extends BaseClassForTests final String path = "/"; final CountDownLatch removedLatch = new CountDownLatch(1); - Watcher watcher = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { - removedLatch.countDown(); - } - } - }; + Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - client.watches().remove(watcher).ofType(WatcherType.Any).inBackground().forPath(path); + client.watches().remove(watcher).inBackground().forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); @@ -246,41 +220,87 @@ public class TestRemoveWatches extends BaseClassForTests final String path = "/"; final CountDownLatch removedLatch = new CountDownLatch(2); - Watcher watcher1 = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { - removedLatch.countDown(); - } - } - }; + Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved); + Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); - Watcher watcher2 = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { - removedLatch.countDown(); - } - } - }; + client.getChildren().usingWatcher(watcher1).forPath(path); + client.checkExists().usingWatcher(watcher2).forPath(path); + client.watches().removeAll().forPath(path); - client.checkExists().usingWatcher(watcher1).forPath(path); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testRemoveAllDataWatches() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + final String path = "/"; + final AtomicBoolean removedFlag = new AtomicBoolean(false); + final CountDownLatch removedLatch = new CountDownLatch(1); + + Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved); + Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); + + client.getChildren().usingWatcher(watcher1).forPath(path); client.checkExists().usingWatcher(watcher2).forPath(path); - client.watches().removeAll().ofType(WatcherType.Any).forPath(path); + client.watches().removeAll().ofType(WatcherType.Data).forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); + Assert.assertEquals(removedFlag.get(), false); } finally { CloseableUtils.closeQuietly(client); } - } + } + + @Test + public void testRemoveAllChildWatches() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.builder(). + connectString(server.getConnectString()). + retryPolicy(new RetryOneTime(1)). + build(); + try + { + client.start(); + + final String path = "/"; + final AtomicBoolean removedFlag = new AtomicBoolean(false); + final CountDownLatch removedLatch = new CountDownLatch(1); + + Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved); + Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved); + + client.checkExists().usingWatcher(watcher1).forPath(path); + client.getChildren().usingWatcher(watcher2).forPath(path); + + client.watches().removeAll().ofType(WatcherType.Children).forPath(path); + + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); + Assert.assertEquals(removedFlag.get(), false); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } /** * TODO: THIS IS STILL A WORK IN PROGRESS. local() is currently broken if no connection to ZK is available. @@ -301,20 +321,7 @@ public class TestRemoveWatches extends BaseClassForTests final CountDownLatch removedLatch = new CountDownLatch(1); - Watcher watcher = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if(event.getPath() == null || event.getType() == null) { - return; - } - - if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { - removedLatch.countDown(); - } - } - }; + Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); @@ -323,7 +330,7 @@ public class TestRemoveWatches extends BaseClassForTests timing.sleepABit(); - client.watches().removeAll().ofType(WatcherType.Any).locally().forPath(path); + client.watches().removeAll().locally().forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -357,7 +364,7 @@ public class TestRemoveWatches extends BaseClassForTests } }; - client.watches().remove(watcher).ofType(WatcherType.Data).forPath(path); + client.watches().remove(watcher).forPath(path); } finally { @@ -372,6 +379,7 @@ public class TestRemoveWatches extends BaseClassForTests @Test public void testRemoveUnregisteredWatcherQuietly() throws Exception { + Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). @@ -380,19 +388,69 @@ public class TestRemoveWatches extends BaseClassForTests { client.start(); + final AtomicBoolean watcherRemoved = new AtomicBoolean(false); + final String path = "/"; - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) - { - } - }; + Watcher watcher = new BooleanWatcher(path, watcherRemoved, EventType.DataWatchRemoved); - client.watches().remove(watcher).ofType(WatcherType.Data).quietly().forPath(path); + client.watches().remove(watcher).quietly().forPath(path); + + timing.sleepABit(); + + //There should be no watcher removed as none were registered. + Assert.assertEquals(watcherRemoved.get(), false); } finally { CloseableUtils.closeQuietly(client); } + } + + private static class CountDownWatcher implements Watcher { + private String path; + private EventType eventType; + private CountDownLatch removeLatch; + + public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) { + this.path = path; + this.eventType = eventType; + this.removeLatch = removeLatch; + } + + @Override + public void process(WatchedEvent event) + { + if(event.getPath() == null || event.getType() == null) { + return; + } + + if(event.getPath().equals(path) && event.getType() == eventType) { + removeLatch.countDown(); + } + } + } + + private static class BooleanWatcher implements Watcher { + private String path; + private EventType eventType; + private AtomicBoolean removedFlag; + + public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) { + this.path = path; + this.eventType = eventType; + this.removedFlag = removedFlag; + } + + @Override + public void process(WatchedEvent event) + { + if(event.getPath() == null || event.getType() == null) { + return; + } + + if(event.getPath().equals(path) && event.getType() == eventType) { + removedFlag.set(true); + } + } } }