The path should be part of NamespaceWatcher's identity. Also, I can simplify WatcherRemovalManager
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3db10810 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3db10810 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3db10810 Branch: refs/heads/better-namspace-watcher Commit: 3db108102b64255671e472725dc8496a45d8a7e4 Parents: ff8a795 Author: randgalt <[email protected]> Authored: Thu Feb 4 20:48:17 2016 -0500 Committer: randgalt <[email protected]> Committed: Thu Feb 4 20:48:17 2016 -0500 ---------------------------------------------------------------------- .../framework/imps/NamespaceWatcher.java | 6 +++ .../framework/imps/WatcherRemovalManager.java | 39 ++++++++++---------- 2 files changed, 26 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/3db10810/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java index f9af7ca..e597095 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java @@ -108,6 +108,11 @@ class NamespaceWatcher implements Watcher, Closeable { NamespaceWatcher watcher = (NamespaceWatcher)o; + if ( !unfixedPath.equals(watcher.getUnfixedPath()) ) + { + return false; + } + //noinspection SimplifiableIfStatement if ( actualWatcher != null ? !actualWatcher.equals(watcher.actualWatcher) : watcher.actualWatcher != null ) { @@ -129,6 +134,7 @@ class NamespaceWatcher implements Watcher, Closeable public int hashCode() { int result = actualWatcher != null ? actualWatcher.hashCode() : 0; + result = 31 * result + unfixedPath.hashCode(); result = 31 * result + (curatorWatcher != null ? curatorWatcher.hashCode() : 0); return result; } http://git-wip-us.apache.org/repos/asf/curator/blob/3db10810/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java index e3d20e2..b9c9044 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java @@ -20,59 +20,60 @@ package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Set; public class WatcherRemovalManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFrameworkImpl client; - private final Set<NamespaceWatcher> entries = Sets.newHashSet(); // guarded by sync + private final Set<NamespaceWatcher> entries = Sets.newConcurrentHashSet(); WatcherRemovalManager(CuratorFrameworkImpl client) { this.client = client; } - synchronized void add(NamespaceWatcher watcher) + void add(NamespaceWatcher watcher) { watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null"); entries.add(watcher); } @VisibleForTesting - synchronized Set<? extends Watcher> getEntries() + Set<? extends Watcher> getEntries() { return Sets.newHashSet(entries); } void removeWatchers() { - Set<NamespaceWatcher> localEntries; - synchronized(this) + List<NamespaceWatcher> localEntries = Lists.newArrayList(entries); + while ( localEntries.size() > 0 ) { - localEntries = Sets.newHashSet(entries); - entries.clear(); - } - for ( NamespaceWatcher watcher : localEntries ) - { - try - { - log.debug("Removing watcher for path: " + watcher.getUnfixedPath()); - RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client); - builder.internalRemoval(watcher, watcher.getUnfixedPath()); - } - catch ( Exception e ) + NamespaceWatcher watcher = localEntries.remove(0); + if ( entries.remove(watcher) ) { - log.error("Could not remove watcher for path: " + watcher.getUnfixedPath()); + try + { + log.debug("Removing watcher for path: " + watcher.getUnfixedPath()); + RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client); + builder.internalRemoval(watcher, watcher.getUnfixedPath()); + } + catch ( Exception e ) + { + log.error("Could not remove watcher for path: " + watcher.getUnfixedPath()); + } } } } - synchronized void noteTriggeredWatcher(NamespaceWatcher watcher) + void noteTriggeredWatcher(NamespaceWatcher watcher) { entries.remove(watcher); }
