Repository: curator Updated Branches: refs/heads/CURATOR-105 [created] a0fe6ae2f
The internal watcher map is holding on to references that make GCing them impossible. Adding clearWatcherReferences() to remove the references. However, still more work to do. Guava's cache seems to hold references. Is this a bug? Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a0fe6ae2 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a0fe6ae2 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a0fe6ae2 Branch: refs/heads/CURATOR-105 Commit: a0fe6ae2f280810f0bb746313bab0a69757b4c3c Parents: 0f7acf7 Author: randgalt <randg...@apache.org> Authored: Wed May 14 09:32:13 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Wed May 14 09:32:13 2014 -0500 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 9 +++ .../framework/imps/CuratorFrameworkImpl.java | 10 ++++ .../framework/imps/NamespaceWatcher.java | 38 +++++++----- .../framework/imps/NamespaceWatcherMap.java | 13 +++-- .../recipes/cache/PathChildrenCache.java | 17 +++++- .../x/discovery/ServiceCacheLeakTest.java | 61 ++++++++++++++++++++ 6 files changed, 128 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/a0fe6ae2/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index dcbc567..2d6e182 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -26,6 +26,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.Watcher; import java.io.Closeable; /** @@ -201,4 +202,12 @@ public interface CuratorFramework extends Closeable * @return new EnsurePath instance */ public EnsurePath newNamespaceAwareEnsurePath(String path); + + /** + * Curator can hold internal references to watchers that may inhibit garbage collection. + * Call this method on watchers you are no longer interested in. + * + * @param watcher the watcher + */ + public void clearWatcherReferences(Watcher watcher); } http://git-wip-us.apache.org/repos/asf/curator/blob/a0fe6ae2/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 7854308..2238a56 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -206,6 +206,16 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override + public void clearWatcherReferences(Watcher watcher) + { + NamespaceWatcher namespaceWatcher = namespaceWatcherMap.remove(watcher); + if ( namespaceWatcher != null ) + { + namespaceWatcher.close(); + } + } + + @Override public CuratorFrameworkState getState() { return state.get(); http://git-wip-us.apache.org/repos/asf/curator/blob/a0fe6ae2/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java index 7a2e4c3..299e28b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcher.java @@ -21,12 +21,13 @@ package org.apache.curator.framework.imps; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import java.io.Closeable; -class NamespaceWatcher implements Watcher +class NamespaceWatcher implements Watcher, Closeable { - private final CuratorFrameworkImpl client; - private final Watcher actualWatcher; - private final CuratorWatcher curatorWatcher; + private volatile CuratorFrameworkImpl client; + private volatile Watcher actualWatcher; + private volatile CuratorWatcher curatorWatcher; NamespaceWatcher(CuratorFrameworkImpl client, Watcher actualWatcher) { @@ -43,21 +44,32 @@ class NamespaceWatcher implements Watcher } @Override + public void close() + { + client = null; + actualWatcher = null; + curatorWatcher = null; + } + + @Override public void process(WatchedEvent event) { - if ( actualWatcher != null ) - { - actualWatcher.process(new NamespaceWatchedEvent(client, event)); - } - else if ( curatorWatcher != null ) + if ( client != null ) { - try + if ( actualWatcher != null ) { - curatorWatcher.process(new NamespaceWatchedEvent(client, event)); + actualWatcher.process(new NamespaceWatchedEvent(client, event)); } - catch ( Exception e ) + else if ( curatorWatcher != null ) { - client.logError("Watcher exception", e); + try + { + curatorWatcher.process(new NamespaceWatchedEvent(client, event)); + } + catch ( Exception e ) + { + client.logError("Watcher exception", e); + } } } } http://git-wip-us.apache.org/repos/asf/curator/blob/a0fe6ae2/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java index 962036b..e5aecb2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java @@ -19,7 +19,7 @@ package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.MapMaker; +import com.google.common.cache.CacheBuilder; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.Watcher; import java.io.Closeable; @@ -28,9 +28,10 @@ import java.util.concurrent.ConcurrentMap; class NamespaceWatcherMap implements Closeable { - private final ConcurrentMap<Object, NamespaceWatcher> map = new MapMaker() + private final ConcurrentMap<Object, NamespaceWatcher> map = CacheBuilder.newBuilder() .weakValues() - .makeMap(); + .<Object, NamespaceWatcher>build() + .asMap(); private final CuratorFrameworkImpl client; NamespaceWatcherMap(CuratorFrameworkImpl client) @@ -60,12 +61,16 @@ class NamespaceWatcherMap implements Closeable } } - @VisibleForTesting NamespaceWatcher get(Object key) { return map.get(key); } + NamespaceWatcher remove(Object key) + { + return map.remove(key); + } + @VisibleForTesting boolean isEmpty() { http://git-wip-us.apache.org/repos/asf/curator/blob/a0fe6ae2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 855d060..fabdd49 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -89,7 +89,7 @@ public class PathChildrenCache implements Closeable private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null); - private final Watcher childrenWatcher = new Watcher() + private volatile Watcher childrenWatcher = new Watcher() { @Override public void process(WatchedEvent event) @@ -98,7 +98,7 @@ public class PathChildrenCache implements Closeable } }; - private final Watcher dataWatcher = new Watcher() + private volatile Watcher dataWatcher = new Watcher() { @Override public void process(WatchedEvent event) @@ -124,7 +124,7 @@ public class PathChildrenCache implements Closeable @VisibleForTesting volatile Exchanger<Object> rebuildTestExchanger; - private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) @@ -366,7 +366,18 @@ public class PathChildrenCache implements Closeable if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { client.getConnectionStateListenable().removeListener(connectionStateListener); + listeners.clear(); executorService.close(); + client.clearWatcherReferences(childrenWatcher); + client.clearWatcherReferences(dataWatcher); + +/* + This seems to enable even more GC - I'm not sure why yet + + connectionStateListener = null; + childrenWatcher = null; + dataWatcher = null; +*/ } } http://git-wip-us.apache.org/repos/asf/curator/blob/a0fe6ae2/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTest.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTest.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTest.java new file mode 100644 index 0000000..a8c130c --- /dev/null +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/ServiceCacheLeakTest.java @@ -0,0 +1,61 @@ +package org.apache.curator.x.discovery; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.discovery.strategies.RandomStrategy; +import org.testng.annotations.Test; + +public class ServiceCacheLeakTest +{ + @Test + public void serviceCacheInstancesLeaked() throws Exception + { + TestingServer testingServer = new TestingServer(); + + final CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new RetryOneTime(1)); + try + { + curatorFramework.start(); + + doWork(curatorFramework); + System.gc(); + + System.out.println("Done - get dump"); + Thread.currentThread().join(); + } + finally + { + CloseableUtils.closeQuietly(curatorFramework); + CloseableUtils.closeQuietly(testingServer); + } + } + + private void doWork(CuratorFramework curatorFramework) throws Exception + { + ServiceInstance<Void> thisInstance = ServiceInstance.<Void>builder().name("myservice").build(); + final ServiceDiscovery<Void> serviceDiscovery = ServiceDiscoveryBuilder.builder(Void.class).client(curatorFramework.usingNamespace("dev")).basePath("/instances").thisInstance(thisInstance).build(); + serviceDiscovery.start(); + + for ( int i = 0; i < 100000; i++ ) + { + final ServiceProvider<Void> s = serviceProvider(serviceDiscovery, "myservice"); + s.start(); + try + { + s.getInstance().buildUriSpec(); + } + finally + { + s.close(); + } + } + } + + private ServiceProvider<Void> serviceProvider(ServiceDiscovery<Void> serviceDiscovery, String name) throws Exception + { + return serviceDiscovery.serviceProviderBuilder().serviceName(name).providerStrategy(new RandomStrategy<Void>()).build(); + } +} \ No newline at end of file