Repository: curator Updated Branches: refs/heads/CURATOR-3.0 75325d4ae -> d26c38dba
Make sure NamespaceWatcherMap is cleared when the corresponding watcher is removed via new APIs. Added tests to ensure this. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/adb4be47 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/adb4be47 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/adb4be47 Branch: refs/heads/CURATOR-3.0 Commit: adb4be47ca6e64962aae3067412cc76aa4b0cd22 Parents: ae8dc46 Author: randgalt <[email protected]> Authored: Mon Jan 18 17:53:27 2016 -0500 Committer: randgalt <[email protected]> Committed: Mon Jan 18 17:53:27 2016 -0500 ---------------------------------------------------------------------- .../org/apache/curator/utils/DebugUtils.java | 1 + .../framework/imps/NamespaceWatcherMap.java | 11 ++++ .../imps/RemoveWatchesBuilderImpl.java | 10 ++- .../framework/imps/WatcherRemovalFacade.java | 11 +++- .../framework/imps/WatcherRemovalManager.java | 5 +- .../framework/imps/TestRemoveWatches.java | 67 ++++++++++++-------- .../framework/recipes/cache/TreeCache.java | 2 +- .../apache/curator/test/BaseClassForTests.java | 14 ++++ 8 files changed, 88 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java index 03f6903..beea726 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java +++ b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java @@ -25,6 +25,7 @@ public class DebugUtils public static final String PROPERTY_DONT_LOG_CONNECTION_ISSUES = "curator-dont-log-connection-problems"; public static final String PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL = "curator-log-only-first-connection-issue-as-error-level"; public static final String PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = "curator-remove-watchers-in-foreground"; + public static final String PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY = "curator-validate-namespace-watcher-map-empty"; private DebugUtils() { http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java index e5aecb2..00618e6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java @@ -42,6 +42,11 @@ class NamespaceWatcherMap implements Closeable @Override public void close() { + clear(); + } + + void clear() + { map.clear(); } @@ -71,6 +76,12 @@ class NamespaceWatcherMap implements Closeable return map.remove(key); } + boolean removeWatcher(Object watcher) + { + //noinspection SuspiciousMethodCalls + return map.values().remove(watcher); + } + @VisibleForTesting boolean isEmpty() { http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index f2666e6..c1772f1 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -231,10 +231,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat ZooKeeper zkClient = client.getZooKeeper(); if(watcher == null) { + client.getNamespaceWatcherMap().clear(); zkClient.removeAllWatches(path, watcherType, local); } else { + client.getNamespaceWatcherMap().removeWatcher(watcher); zkClient.removeWatches(path, watcher, watcherType, local); } } @@ -252,10 +254,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat if(watcher == null) { - zkClient.removeAllWatches(path, watcherType, local); + client.getNamespaceWatcherMap().clear(); + zkClient.removeAllWatches(path, watcherType, local); } else { + client.getNamespaceWatcherMap().removeWatcher(watcher); zkClient.removeWatches(path, watcher, watcherType, local); } } @@ -304,10 +308,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat ZooKeeper zkClient = client.getZooKeeper(); if(watcher == null) { - zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); + client.getNamespaceWatcherMap().clear(); + zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); } else { + client.getNamespaceWatcherMap().removeWatcher(watcher); zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext()); } http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index 371fc63..91530b4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -41,7 +42,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove { super(client); this.client = client; - removalManager = new WatcherRemovalManager(client); + removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap()); } @Override @@ -65,6 +66,14 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove public void removeWatchers() { removalManager.removeWatchers(); + + if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) ) + { + if ( !getNamespaceWatcherMap().isEmpty() ) + { + throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap()); + } + } } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java index a691a94..064964d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java @@ -32,11 +32,13 @@ public class WatcherRemovalManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFrameworkImpl client; + private final NamespaceWatcherMap namespaceWatcherMap; private final Set<WrappedWatcher> entries = Sets.newHashSet(); // guarded by sync - WatcherRemovalManager(CuratorFrameworkImpl client) + WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap) { this.client = client; + this.namespaceWatcherMap = namespaceWatcherMap; } synchronized Watcher add(String path, Watcher watcher) @@ -67,6 +69,7 @@ public class WatcherRemovalManager try { log.debug("Removing watcher for path: " + entry.path); + namespaceWatcherMap.removeWatcher(entry.watcher); RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client); builder.internalRemoval(entry, entry.path); } http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index 4e02e95..a7c137a 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -18,12 +18,6 @@ */ package org.apache.curator.framework.imps; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; @@ -46,6 +40,9 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class TestRemoveWatches extends BaseClassForTests { @@ -75,7 +72,8 @@ public class TestRemoveWatches extends BaseClassForTests { return true; } - + + //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized(stateRef) { if(stateRef.get() == desiredState) @@ -139,7 +137,7 @@ public class TestRemoveWatches extends BaseClassForTests public void testRemoveCuratorWatch() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -163,9 +161,11 @@ public class TestRemoveWatches extends BaseClassForTests }; client.checkExists().usingWatcher(watcher).forPath(path); + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); client.watches().remove(watcher).forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally @@ -178,7 +178,7 @@ public class TestRemoveWatches extends BaseClassForTests public void testRemoveWatch() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -192,9 +192,11 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); + client.watches().remove(watcher).forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally @@ -207,7 +209,7 @@ public class TestRemoveWatches extends BaseClassForTests public void testRemoveWatchInBackgroundWithCallback() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -233,11 +235,12 @@ public class TestRemoveWatches extends BaseClassForTests } }; - client.checkExists().usingWatcher(watcher).forPath(path); - + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); + client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -251,7 +254,7 @@ public class TestRemoveWatches extends BaseClassForTests public void testRemoveWatchInBackgroundWithNoCallback() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -264,9 +267,11 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); + client.watches().remove(watcher).inBackground().forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -280,7 +285,7 @@ public class TestRemoveWatches extends BaseClassForTests public void testRemoveAllWatches() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -296,9 +301,11 @@ public class TestRemoveWatches extends BaseClassForTests client.getChildren().usingWatcher(watcher1).forPath(path); client.checkExists().usingWatcher(watcher2).forPath(path); - + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); + client.watches().removeAll().forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally @@ -376,7 +383,7 @@ public class TestRemoveWatches extends BaseClassForTests @Test public void testRemoveLocalWatch() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -393,14 +400,16 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); + //Stop the server so we can check if we can remove watches locally when offline server.stop(); Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); client.watches().removeAll().locally().forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally @@ -412,7 +421,7 @@ public class TestRemoveWatches extends BaseClassForTests @Test public void testRemoveLocalWatchInBackground() throws Exception { Timing timing = new Timing(); - CuratorFramework client = CuratorFrameworkFactory.builder(). + CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); @@ -429,14 +438,16 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - + Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); + //Stop the server so we can check if we can remove watches locally when offline server.stop(); Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); client.watches().removeAll().locally().inBackground().forPath(path); - + Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); + Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index 81590f7..a2f0e86 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -305,7 +305,7 @@ public class TreeCache implements Closeable { Stat oldStat = stat.getAndSet(null); byte[] oldData = data.getAndSet(null); - client.watches().remove(this).ofType(WatcherType.Any).inBackground().forPath(path); + client.watches().remove(this).ofType(WatcherType.Any).locally().inBackground().forPath(path); ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null); if ( childMap != null ) http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index da1607c..a5afaf2 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -41,6 +41,7 @@ public class BaseClassForTests private static final int RETRY_WAIT_MS = 5000; private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES; private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND; + private static final String INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY; static { @@ -67,6 +68,17 @@ public class BaseClassForTests e.printStackTrace(); } INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = s; + s = null; + try + { + // use reflection to avoid adding a circular dependency in the pom + s = (String)Class.forName("org.apache.curator.utils.DebugUtils").getField("PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY").get(null); + } + catch ( Exception e ) + { + e.printStackTrace(); + } + INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY = s; } @BeforeSuite(alwaysRun = true) @@ -107,6 +119,7 @@ public class BaseClassForTests System.setProperty(INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true"); } System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true"); + System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true"); while ( server == null ) { @@ -125,6 +138,7 @@ public class BaseClassForTests @AfterMethod public void teardown() throws Exception { + System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY); System.clearProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND); if ( server != null ) {
