more tests, refinements
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/49b2fd3a Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/49b2fd3a Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/49b2fd3a Branch: refs/heads/CURATOR-3.0 Commit: 49b2fd3a8313cd05292e2ca8edb4b14b08f0de55 Parents: d3672a5 Author: randgalt <[email protected]> Authored: Mon May 11 18:04:01 2015 -0500 Committer: randgalt <[email protected]> Committed: Mon May 11 18:04:01 2015 -0500 ---------------------------------------------------------------------- .../framework/imps/WatcherRemovalFacade.java | 5 ++ .../framework/imps/WatcherRemovalManager.java | 90 ++++++++++++++++---- .../apache/curator/framework/imps/Watching.java | 2 +- .../imps/TestWatcherRemovalManager.java | 68 +++++++++++++++ .../curator/framework/imps/TestCleanState.java | 5 ++ 5 files changed, 151 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/49b2fd3a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index 664c9b0..eee423f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -48,6 +48,11 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove throw new UnsupportedOperationException(); } + WatcherRemovalManager getRemovalManager() + { + return removalManager; + } + @Override public void removeWatchers() { http://git-wip-us.apache.org/repos/asf/curator/blob/49b2fd3a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java index 689ade2..5a705a4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java @@ -18,68 +18,122 @@ */ package org.apache.curator.framework.imps; -import com.google.common.collect.Maps; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; class WatcherRemovalManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFrameworkImpl client; - private final Map<Watcher, String> entries = Maps.newConcurrentMap(); + private final Set<WrappedWatcher> entries = Sets.newHashSet(); // guarded by sync WatcherRemovalManager(CuratorFrameworkImpl client) { this.client = client; } - Watcher add(String path, Watcher watcher) + synchronized Watcher add(String path, Watcher watcher) { - Watcher wrappedWatcher = new WrappedWatcher(entries, watcher); - entries.put(wrappedWatcher, path); + path = Preconditions.checkNotNull(path, "path cannot be null"); + watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null"); + + WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path); + entries.add(wrappedWatcher); return wrappedWatcher; } + @VisibleForTesting + synchronized Set<? extends Watcher> getEntries() + { + return Sets.newHashSet(entries); + } + void removeWatchers() { - for ( Map.Entry<Watcher, String> entry : entries.entrySet() ) + HashSet<WrappedWatcher> localEntries; + synchronized(this) + { + localEntries = Sets.newHashSet(entries); + } + for ( WrappedWatcher entry : localEntries ) { - Watcher watcher = entry.getKey(); - String path = entry.getValue(); try { - log.debug("Removing watcher for path: " + path); + log.debug("Removing watcher for path: " + entry.path); RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client); - builder.prepInternalRemoval(watcher); - builder.pathInForeground(path); + builder.prepInternalRemoval(entry); + builder.pathInForeground(entry.path); } catch ( Exception e ) { - String message = "Could not remove watcher for path: " + path; + String message = "Could not remove watcher for path: " + entry.path; log.error(message); } } } - private static class WrappedWatcher implements Watcher + private synchronized void internalRemove(WrappedWatcher entry) + { + entries.remove(entry); + } + + private class WrappedWatcher implements Watcher { - private final Map<Watcher, String> entries; private final Watcher watcher; + private final String path; - WrappedWatcher(Map<Watcher, String> entries, Watcher watcher) + WrappedWatcher(Watcher watcher, String path) { - this.entries = entries; this.watcher = watcher; + this.path = path; } @Override public void process(WatchedEvent event) { - entries.remove(this); + if ( event.getType() != Event.EventType.None ) + { + internalRemove(this); + } watcher.process(event); } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + WrappedWatcher entry = (WrappedWatcher)o; + + //noinspection SimplifiableIfStatement + if ( !watcher.equals(entry.watcher) ) + { + return false; + } + return path.equals(entry.path); + + } + + @Override + public int hashCode() + { + int result = watcher.hashCode(); + result = 31 * result + path.hashCode(); + return result; + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/49b2fd3a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index ae16dfc..4bebbd5 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -52,7 +52,7 @@ class Watching Watcher getWatcher(CuratorFrameworkImpl client, String unfixedPath) { - if ( client.getWatcherRemovalManager() != null ) + if ( (watcher != null) && (client.getWatcherRemovalManager() != null) ) { return client.getWatcherRemovalManager().add(unfixedPath, watcher); } http://git-wip-us.apache.org/repos/asf/curator/blob/49b2fd3a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java index 6e28bea..d951c57 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java @@ -31,6 +31,7 @@ import org.apache.zookeeper.Watcher; import org.testng.Assert; import org.testng.annotations.Test; import java.util.List; +import java.util.concurrent.CountDownLatch; public class TestWatcherRemovalManager extends BaseClassForTests { @@ -102,6 +103,73 @@ public class TestWatcherRemovalManager extends BaseClassForTests } } + @Test + public void testSameWatcher() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + + removerClient.getData().usingWatcher(watcher).forPath("/"); + Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1); + removerClient.getData().usingWatcher(watcher).forPath("/"); + Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testTriggered() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + + final CountDownLatch latch = new CountDownLatch(1); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + if ( event.getType() == Event.EventType.NodeCreated ) + { + latch.countDown(); + } + } + }; + + removerClient.checkExists().usingWatcher(watcher).forPath("/yo"); + Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1); + removerClient.create().forPath("/yo"); + + Assert.assertTrue(new Timing().awaitLatch(latch)); + + Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 0); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + private void internalTryBasic(CuratorFramework client) throws Exception { WatcherRemoveCuratorFramework removerClient = client.newWatcherRemoveCuratorFramework(); http://git-wip-us.apache.org/repos/asf/curator/blob/49b2fd3a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java index 95a1088..8ca8409 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java @@ -35,6 +35,11 @@ public class TestCleanState try { CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client; + if ( !internalClient.getNamespaceWatcherMap().isEmpty() ) + { + throw new AssertionError("NamespaceWatcherMap is not empty"); + } + ZooKeeper zooKeeper = internalClient.getZooKeeper(); if ( zooKeeper != null ) {
