Repository: curator Updated Branches: refs/heads/CURATOR-3.0 822890844 -> 0ed6f662c
Major re-work of Watcher wrappers. It really isn't necessary to keep a map of watchers. It was originally done so that watcher identity was maintained, but this can be achieved much easier using special-purpose hashCode() and equals() on NamespaceWatcher. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ff8a795e Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ff8a795e Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ff8a795e Branch: refs/heads/CURATOR-3.0 Commit: ff8a795e61d0d44622bdbaf2144c25c70e31e864 Parents: 4e4072b Author: randgalt <[email protected]> Authored: Thu Feb 4 18:57:35 2016 -0500 Committer: randgalt <[email protected]> Committed: Thu Feb 4 18:57:35 2016 -0500 ---------------------------------------------------------------------- .../WatcherRemoveCuratorFramework.java | 2 + .../framework/imps/CuratorFrameworkImpl.java | 14 +-- .../framework/imps/ExistsBuilderImpl.java | 4 +- .../framework/imps/GetChildrenBuilderImpl.java | 4 +- .../framework/imps/GetConfigBuilderImpl.java | 4 +- .../framework/imps/GetDataBuilderImpl.java | 4 +- .../framework/imps/NamespaceWatcher.java | 61 +++++++++- .../framework/imps/NamespaceWatcherMap.java | 114 ------------------- .../imps/RemoveWatchesBuilderImpl.java | 99 ++++++++-------- .../framework/imps/WatcherRemovalFacade.java | 12 +- .../framework/imps/WatcherRemovalManager.java | 86 ++------------ .../apache/curator/framework/imps/Watching.java | 38 +++++-- .../framework/imps/TestFrameworkEdges.java | 15 +++ .../framework/imps/TestRemoveWatches.java | 16 +-- .../framework/imps/TestWatcherIdentity.java | 83 +++----------- 15 files changed, 196 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java index 871b53c..1492898 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java @@ -18,6 +18,8 @@ */ package org.apache.curator.framework; +import org.apache.zookeeper.Watcher; + /** * A CuratorFramework facade that tracks watchers created and allows a one-shot removal of all watchers */ http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 2b8bbbc..191c50a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -22,7 +22,6 @@ package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import org.apache.curator.CuratorConnectionLossException; import org.apache.curator.CuratorZookeeperClient; @@ -85,7 +84,6 @@ public class CuratorFrameworkImpl implements CuratorFramework private final CompressionProvider compressionProvider; private final ACLProvider aclProvider; private final NamespaceFacadeCache namespaceFacadeCache; - private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; private final ConnectionStateErrorPolicy connectionStateErrorPolicy; private final AtomicLong currentInstanceIndex = new AtomicLong(-1); @@ -240,11 +238,7 @@ public class CuratorFrameworkImpl implements CuratorFramework @Override public void clearWatcherReferences(Watcher watcher) { - NamespaceWatcher namespaceWatcher = namespaceWatcherMap.remove(watcher); - if ( namespaceWatcher != null ) - { - namespaceWatcher.close(); - } + // NOP } @Override @@ -371,7 +365,6 @@ public class CuratorFrameworkImpl implements CuratorFramework unhandledErrorListeners.clear(); connectionStateManager.close(); client.close(); - namespaceWatcherMap.close(); } } @@ -700,11 +693,6 @@ public class CuratorFrameworkImpl implements CuratorFramework return namespaceFacadeCache; } - NamespaceWatcherMap getNamespaceWatcherMap() - { - return namespaceWatcherMap; - } - void validateConnection(Watcher.Event.KeeperState state) { if ( state == Watcher.Event.KeeperState.Disconnected ) http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java index a6316ac..4b3d214 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java @@ -68,14 +68,14 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String> @Override public BackgroundPathable<Stat> usingWatcher(Watcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return this; } @Override public BackgroundPathable<Stat> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return this; } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java index 7929ba3..8365585 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java @@ -137,14 +137,14 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< @Override public BackgroundPathable<List<String>> usingWatcher(Watcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return this; } @Override public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return this; } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 09cb0ab..b64f38e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -129,14 +129,14 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati @Override public BackgroundEnsembleable<byte[]> usingWatcher(Watcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return new InternalBackgroundEnsembleable(); } @Override public BackgroundEnsembleable<byte[]> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return new InternalBackgroundEnsembleable(); } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 7a731d3..e2aa053 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -218,14 +218,14 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String> @Override public BackgroundPathable<byte[]> usingWatcher(Watcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return this; } @Override public BackgroundPathable<byte[]> usingWatcher(CuratorWatcher watcher) { - watching = new Watching(client, watcher); + watching = new Watching(watcher); return this; } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java index 1cb9125..f9af7ca 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.imps; +import com.google.common.base.Preconditions; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.WatchedEvent; @@ -28,20 +29,28 @@ class NamespaceWatcher implements Watcher, Closeable { private volatile CuratorFrameworkImpl client; private volatile Watcher actualWatcher; + private final String unfixedPath; private volatile CuratorWatcher curatorWatcher; - NamespaceWatcher(CuratorFrameworkImpl client, Watcher actualWatcher) + NamespaceWatcher(CuratorFrameworkImpl client, Watcher actualWatcher, String unfixedPath) { this.client = client; this.actualWatcher = actualWatcher; + this.unfixedPath = Preconditions.checkNotNull(unfixedPath, "unfixedPath cannot be null"); this.curatorWatcher = null; } - NamespaceWatcher(CuratorFrameworkImpl client, CuratorWatcher curatorWatcher) + NamespaceWatcher(CuratorFrameworkImpl client, CuratorWatcher curatorWatcher, String unfixedPath) { this.client = client; this.actualWatcher = null; this.curatorWatcher = curatorWatcher; + this.unfixedPath = Preconditions.checkNotNull(unfixedPath, "unfixedPath cannot be null"); + } + + String getUnfixedPath() + { + return unfixedPath; } @Override @@ -57,6 +66,11 @@ class NamespaceWatcher implements Watcher, Closeable { if ( client != null ) { + if ( (event.getType() != Event.EventType.None) && (client.getWatcherRemovalManager() != null) ) + { + client.getWatcherRemovalManager().noteTriggeredWatcher(this); + } + if ( actualWatcher != null ) { actualWatcher.process(new NamespaceWatchedEvent(client, event)); @@ -75,4 +89,47 @@ class NamespaceWatcher implements Watcher, Closeable } } } + + // specialized equals()/hashCode() that makes this instance equal to the actual watcher + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null ) + { + return false; + } + + if ( getClass() == o.getClass() ) + { + NamespaceWatcher watcher = (NamespaceWatcher)o; + + //noinspection SimplifiableIfStatement + if ( actualWatcher != null ? !actualWatcher.equals(watcher.actualWatcher) : watcher.actualWatcher != null ) + { + return false; + } + return curatorWatcher != null ? curatorWatcher.equals(watcher.curatorWatcher) : watcher.curatorWatcher == null; + } + + //noinspection SimplifiableIfStatement + if ( Watcher.class.isAssignableFrom(o.getClass()) ) + { + return actualWatcher == o; + } + + return false; + } + + @Override + public int hashCode() + { + int result = actualWatcher != null ? actualWatcher.hashCode() : 0; + result = 31 * result + (curatorWatcher != null ? curatorWatcher.hashCode() : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java deleted file mode 100644 index c864f44..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.imps; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.zookeeper.Watcher; -import java.io.Closeable; -import java.lang.reflect.Field; -import java.util.concurrent.ConcurrentMap; - -class NamespaceWatcherMap implements Closeable -{ - private final Cache<Object, NamespaceWatcher> cache = CacheBuilder.newBuilder() - .weakValues() - .<Object, NamespaceWatcher>build(); - private final ConcurrentMap<Object, NamespaceWatcher> map = cache.asMap(); - private final CuratorFrameworkImpl client; - - NamespaceWatcherMap(CuratorFrameworkImpl client) - { - this.client = client; - } - - @Override - public void close() - { - clear(); - } - - void clear() - { - map.clear(); - } - - @VisibleForTesting - void drain() throws Exception - { - Runtime.getRuntime().gc(); - - // relies on internals of MapMakerInternalMap (obviously) - Class mapMakerInternalMapClass = Class.forName("com.google.common.collect.MapMakerInternalMap"); - Field drainThresholdField = mapMakerInternalMapClass.getDeclaredField("DRAIN_THRESHOLD"); - drainThresholdField.setAccessible(true); - int drainThreshold = drainThresholdField.getInt(null) + 1; - while ( drainThreshold-- > 0 ) - { - map.get(new Object()); - } - } - - NamespaceWatcher get(Object key) - { - return map.get(key); - } - - NamespaceWatcher remove(Object key) - { - return map.remove(key); - } - - boolean removeWatcher(Object watcher) - { - //noinspection SuspiciousMethodCalls - return map.values().remove(watcher); - } - - @VisibleForTesting - boolean isEmpty() - { - cache.cleanUp(); - return map.isEmpty(); - } - - NamespaceWatcher getNamespaceWatcher(Watcher watcher) - { - return get(watcher, new NamespaceWatcher(client, watcher)); - } - - NamespaceWatcher getNamespaceWatcher(CuratorWatcher watcher) - { - return get(watcher, new NamespaceWatcher(client, watcher)); - } - - private NamespaceWatcher get(Object watcher, NamespaceWatcher newNamespaceWatcher) - { - NamespaceWatcher existingNamespaceWatcher = map.putIfAbsent(watcher, newNamespaceWatcher); - return (existingNamespaceWatcher != null) ? existingNamespaceWatcher : newNamespaceWatcher; - } - - @Override - public String toString() - { - return map.toString(); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index c1772f1..58fae29 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -18,33 +18,24 @@ */ package org.apache.curator.framework.imps; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; - import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.BackgroundPathableQuietlyable; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.Pathable; -import org.apache.curator.framework.api.RemoveWatchesLocal; -import org.apache.curator.framework.api.RemoveWatchesBuilder; -import org.apache.curator.framework.api.RemoveWatchesType; +import org.apache.curator.framework.api.*; import org.apache.curator.utils.DebugUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.ZooKeeper; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String> { private CuratorFrameworkImpl client; private Watcher watcher; + private CuratorWatcher curatorWatcher; private WatcherType watcherType; private boolean guaranteed; private boolean local; @@ -55,6 +46,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat { this.client = client; this.watcher = null; + this.curatorWatcher = null; this.watcherType = WatcherType.Any; this.guaranteed = false; this.local = false; @@ -83,26 +75,16 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat @Override public RemoveWatchesType remove(Watcher watcher) { - if(watcher == null) { - this.watcher = null; - } else { - //Try and get the namespaced version of the watcher. - this.watcher = client.getNamespaceWatcherMap().get(watcher); - - //If this is not present then default to the original watcher. This shouldn't happen in practice unless the user - //has added a watch directly to the ZK client rather than via the CuratorFramework. - if(this.watcher == null) { - this.watcher = watcher; - } - } - + this.watcher = watcher; + this.curatorWatcher = null; return this; } @Override public RemoveWatchesType remove(CuratorWatcher watcher) { - this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().get(watcher); + this.watcher = null; + this.curatorWatcher = watcher; return this; } @@ -110,6 +92,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat public RemoveWatchesType removeAll() { this.watcher = null; + this.curatorWatcher = null; return this; } @@ -224,25 +207,25 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat private void pathInForeground(final String path) throws Exception { + NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(path); //For the local case we don't want to use the normal retry loop and we don't want to block until a connection is available. //We just execute the removeWatch, and if it fails, ZK will just remove local watches. - if(local) + if ( local ) { ZooKeeper zkClient = client.getZooKeeper(); - if(watcher == null) + if ( namespaceWatcher != null ) { - client.getNamespaceWatcherMap().clear(); - zkClient.removeAllWatches(path, watcherType, local); + zkClient.removeWatches(path, namespaceWatcher, watcherType, local); } else { - client.getNamespaceWatcherMap().removeWatcher(watcher); - zkClient.removeWatches(path, watcher, watcherType, local); + zkClient.removeAllWatches(path, watcherType, local); } } else { - RetryLoop.callWithRetry(client.getZookeeperClient(), + final NamespaceWatcher finalNamespaceWatcher = namespaceWatcher; + RetryLoop.callWithRetry(client.getZookeeperClient(), new Callable<Void>() { @Override @@ -250,17 +233,15 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat { try { - ZooKeeper zkClient = client.getZookeeperClient().getZooKeeper(); - - if(watcher == null) + ZooKeeper zkClient = client.getZookeeperClient().getZooKeeper(); + + if ( finalNamespaceWatcher != null ) { - client.getNamespaceWatcherMap().clear(); - zkClient.removeAllWatches(path, watcherType, local); + zkClient.removeWatches(path, finalNamespaceWatcher, watcherType, false); } else { - client.getNamespaceWatcherMap().removeWatcher(watcher); - zkClient.removeWatches(path, watcher, watcherType, local); + zkClient.removeAllWatches(path, watcherType, false); } } catch(Exception e) @@ -268,12 +249,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat if( RetryLoop.isRetryException(e) && guaranteed ) { //Setup the guaranteed handler - client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher)); + client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, finalNamespaceWatcher)); throw e; } else if(e instanceof KeeperException.NoWatcherException && quietly) { - //Ignore + // ignore } else { @@ -287,7 +268,28 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat }); } } - + + private NamespaceWatcher makeNamespaceWatcher(String path) + { + NamespaceWatcher namespaceWatcher = null; + if ( watcher != null ) + { + if ( watcher instanceof NamespaceWatcher ) + { + namespaceWatcher = (NamespaceWatcher)watcher; + } + else + { + namespaceWatcher = new NamespaceWatcher(client, watcher, path); + } + } + else if ( curatorWatcher != null ) + { + namespaceWatcher = new NamespaceWatcher(client, curatorWatcher, path); + } + return namespaceWatcher; + } + @Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception @@ -304,17 +306,16 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat client.processBackgroundOperation(operationAndData, event); } }; - + ZooKeeper zkClient = client.getZooKeeper(); - if(watcher == null) + NamespaceWatcher namespaceWatcher = makeNamespaceWatcher(operationAndData.getData()); + if(namespaceWatcher == null) { - client.getNamespaceWatcherMap().clear(); zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); } else { - client.getNamespaceWatcherMap().removeWatcher(watcher); - zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext()); + zkClient.removeWatches(operationAndData.getData(), namespaceWatcher, watcherType, local, callback, operationAndData.getContext()); } } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index 30a992e..b5685f8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -28,8 +28,8 @@ import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -42,7 +42,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove { super(client); this.client = client; - removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap()); + removalManager = new WatcherRemovalManager(client); } @Override @@ -66,14 +66,6 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove public void removeWatchers() { removalManager.removeWatchers(); - - if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) ) - { - if ( !getNamespaceWatcherMap().isEmpty() ) - { - throw new RuntimeException("NamespaceWatcherMap is not empty: " + getNamespaceWatcherMap()); - } - } } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java index 1e6fe94..e3d20e2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java @@ -21,34 +21,26 @@ package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.Set; public class WatcherRemovalManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFrameworkImpl client; - private final NamespaceWatcherMap namespaceWatcherMap; - private final Set<WrappedWatcher> entries = Sets.newHashSet(); // guarded by sync + private final Set<NamespaceWatcher> entries = Sets.newHashSet(); // guarded by sync - WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap) + WatcherRemovalManager(CuratorFrameworkImpl client) { this.client = client; - this.namespaceWatcherMap = namespaceWatcherMap; } - synchronized Watcher add(String path, Watcher watcher) + synchronized void add(NamespaceWatcher watcher) { - path = Preconditions.checkNotNull(path, "path cannot be null"); watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null"); - - WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path); - entries.add(wrappedWatcher); - return wrappedWatcher; + entries.add(watcher); } @VisibleForTesting @@ -59,83 +51,29 @@ public class WatcherRemovalManager void removeWatchers() { - HashSet<WrappedWatcher> localEntries; + Set<NamespaceWatcher> localEntries; synchronized(this) { localEntries = Sets.newHashSet(entries); + entries.clear(); } - for ( WrappedWatcher entry : localEntries ) + for ( NamespaceWatcher watcher : localEntries ) { try { - log.debug("Removing watcher for path: " + entry.path); + log.debug("Removing watcher for path: " + watcher.getUnfixedPath()); RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client); - namespaceWatcherMap.removeWatcher(entry.watcher); - builder.internalRemoval(entry, entry.path); + builder.internalRemoval(watcher, watcher.getUnfixedPath()); } catch ( Exception e ) { - log.error("Could not remove watcher for path: " + entry.path); + log.error("Could not remove watcher for path: " + watcher.getUnfixedPath()); } } } - private synchronized void internalRemove(WrappedWatcher entry) - { - namespaceWatcherMap.removeWatcher(entry.watcher); - entries.remove(entry); - } - - private class WrappedWatcher implements Watcher + synchronized void noteTriggeredWatcher(NamespaceWatcher watcher) { - private final Watcher watcher; - private final String path; - - WrappedWatcher(Watcher watcher, String path) - { - this.watcher = watcher; - this.path = path; - } - - @Override - public void process(WatchedEvent event) - { - if ( event.getType() != Event.EventType.None ) - { - internalRemove(this); - } - watcher.process(event); - } - - @Override - public boolean equals(Object o) - { - if ( this == o ) - { - return true; - } - if ( o == null || getClass() != o.getClass() ) - { - return false; - } - - WrappedWatcher entry = (WrappedWatcher)o; - - //noinspection SimplifiableIfStatement - if ( !watcher.equals(entry.watcher) ) - { - return false; - } - return path.equals(entry.path); - - } - - @Override - public int hashCode() - { - int result = watcher.hashCode(); - result = 31 * result + path.hashCode(); - return result; - } + entries.remove(watcher); } } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index 4bebbd5..27d0a7c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import org.apache.curator.framework.api.CuratorWatcher; @@ -23,24 +24,28 @@ import org.apache.zookeeper.Watcher; class Watching { - private final Watcher watcher; - private final boolean watched; + private final Watcher watcher; + private final CuratorWatcher curatorWatcher; + private final boolean watched; Watching(boolean watched) { this.watcher = null; + this.curatorWatcher = null; this.watched = watched; } - Watching(CuratorFrameworkImpl client, Watcher watcher) + Watching(Watcher watcher) { - this.watcher = (watcher != null) ? client.getNamespaceWatcherMap().getNamespaceWatcher(watcher) : null; + this.watcher = watcher; + this.curatorWatcher = null; this.watched = false; } - Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) + Watching(CuratorWatcher watcher) { - this.watcher = (watcher != null) ? client.getNamespaceWatcherMap().getNamespaceWatcher(watcher) : null; + this.watcher = null; + this.curatorWatcher = watcher; this.watched = false; } @@ -48,15 +53,30 @@ class Watching { watcher = null; watched = false; + curatorWatcher = null; } Watcher getWatcher(CuratorFrameworkImpl client, String unfixedPath) { - if ( (watcher != null) && (client.getWatcherRemovalManager() != null) ) + NamespaceWatcher namespaceWatcher = null; + if ( watcher != null ) + { + namespaceWatcher = new NamespaceWatcher(client, this.watcher, unfixedPath); + } + else if ( curatorWatcher != null ) { - return client.getWatcherRemovalManager().add(unfixedPath, watcher); + namespaceWatcher = new NamespaceWatcher(client, curatorWatcher, unfixedPath); } - return watcher; + + if ( namespaceWatcher != null ) + { + if ( client.getWatcherRemovalManager() != null ) + { + client.getWatcherRemovalManager().add(namespaceWatcher); + } + } + + return namespaceWatcher; } boolean isWatched() http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index d7b79d3..70290ab 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -57,6 +57,21 @@ public class TestFrameworkEdges extends BaseClassForTests private final Timing timing = new Timing(); @Test + public void testQuickClose() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryNTimes(0, 0)); + try + { + client.start(); + client.close(); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testProtectedCreateNodeDeletion() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 1, new RetryNTimes(0, 0)); http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index a7c137a..4ac68d3 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -161,10 +161,8 @@ public class TestRemoveWatches extends BaseClassForTests }; client.checkExists().usingWatcher(watcher).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); - + client.watches().remove(watcher).forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -192,10 +190,8 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); client.watches().remove(watcher).forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -236,10 +232,8 @@ public class TestRemoveWatches extends BaseClassForTests }; client.checkExists().usingWatcher(watcher).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); @@ -267,10 +261,8 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); client.watches().remove(watcher).inBackground().forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); @@ -301,10 +293,8 @@ public class TestRemoveWatches extends BaseClassForTests client.getChildren().usingWatcher(watcher1).forPath(path); client.checkExists().usingWatcher(watcher2).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); client.watches().removeAll().forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -400,7 +390,6 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); //Stop the server so we can check if we can remove watches locally when offline server.stop(); @@ -408,7 +397,6 @@ public class TestRemoveWatches extends BaseClassForTests Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); client.watches().removeAll().locally().forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } @@ -438,7 +426,6 @@ public class TestRemoveWatches extends BaseClassForTests Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved); client.checkExists().usingWatcher(watcher).forPath(path); - Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty()); //Stop the server so we can check if we can remove watches locally when offline server.stop(); @@ -446,7 +433,6 @@ public class TestRemoveWatches extends BaseClassForTests Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); client.watches().removeAll().locally().inBackground().forPath(path); - Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty()); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } http://git-wip-us.apache.org/repos/asf/curator/blob/ff8a795e/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherIdentity.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherIdentity.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherIdentity.java index e950516..2a37052 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherIdentity.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherIdentity.java @@ -18,21 +18,19 @@ */ package org.apache.curator.framework.imps; -import org.apache.curator.test.BaseClassForTests; -import org.apache.curator.utils.CloseableUtils; +import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; public class TestWatcherIdentity extends BaseClassForTests @@ -63,71 +61,24 @@ public class TestWatcherIdentity extends BaseClassForTests } @Test - public void testRefExpiration() throws Exception + public void testSetAddition() { - final int MAX_CHECKS = 10; - - final CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - try + Watcher watcher = new Watcher() { - Assert.assertNull(client.getNamespaceWatcherMap().get(new CountCuratorWatcher())); - - final CountDownLatch latch = new CountDownLatch(1); - ExecutorService service = Executors.newSingleThreadExecutor(); - service.submit - ( - new Callable<Void>() - { - @Override - public Void call() throws Exception - { - CountZKWatcher watcher = new CountZKWatcher(); - client.getNamespaceWatcherMap().getNamespaceWatcher(watcher); - Assert.assertNotNull(client.getNamespaceWatcherMap().get(watcher)); - latch.countDown(); - return null; - } - } - ); - - latch.await(); - service.shutdownNow(); - - Timing timing = new Timing(); - for ( int i = 0; i < MAX_CHECKS; ++i ) + @Override + public void process(WatchedEvent event) { - Assert.assertTrue((i + 1) < MAX_CHECKS); - timing.sleepABit(); - client.getNamespaceWatcherMap().drain(); // just to cause drainReferenceQueues() to get called - if ( client.getNamespaceWatcherMap().isEmpty() ) - { - break; - } } - } - finally - { - CloseableUtils.closeQuietly(client); - } - } - - @Test - public void testSimpleId() - { - CountCuratorWatcher curatorWatcher = new CountCuratorWatcher(); - CountZKWatcher zkWatcher = new CountZKWatcher(); - CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - try - { - Assert.assertSame(client.getNamespaceWatcherMap().getNamespaceWatcher(curatorWatcher), client.getNamespaceWatcherMap().getNamespaceWatcher(curatorWatcher)); - Assert.assertSame(client.getNamespaceWatcherMap().getNamespaceWatcher(zkWatcher), client.getNamespaceWatcherMap().getNamespaceWatcher(zkWatcher)); - Assert.assertNotSame(client.getNamespaceWatcherMap().getNamespaceWatcher(curatorWatcher), client.getNamespaceWatcherMap().getNamespaceWatcher(zkWatcher)); - } - finally - { - CloseableUtils.closeQuietly(client); - } + }; + NamespaceWatcher namespaceWatcher1 = new NamespaceWatcher(null, watcher, "/foo"); + NamespaceWatcher namespaceWatcher2 = new NamespaceWatcher(null, watcher, "/foo"); + Assert.assertEquals(namespaceWatcher1, namespaceWatcher2); + Assert.assertTrue(namespaceWatcher1.equals(watcher)); + Set<Watcher> set = Sets.newHashSet(); + set.add(namespaceWatcher1); + set.add(namespaceWatcher2); + Assert.assertEquals(set.size(), 1); } @Test
