Repository: curator Updated Branches: refs/heads/CURATOR-170 [created] 915d66f90
Added new apis to add additional paths to ChildReaper and a test. This change is backward compatible Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/915d66f9 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/915d66f9 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/915d66f9 Branch: refs/heads/CURATOR-170 Commit: 915d66f90a12f1f89177cf4acf83d35e65949408 Parents: ef2ca57 Author: randgalt <[email protected]> Authored: Tue Dec 9 14:46:00 2014 -0500 Committer: randgalt <[email protected]> Committed: Tue Dec 9 14:46:00 2014 -0500 ---------------------------------------------------------------------- .../framework/recipes/locks/ChildReaper.java | 51 +++++++++++++++----- .../recipes/locks/TestChildReaper.java | 37 ++++++++++++++ 2 files changed, 75 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/915d66f9/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java index 5f28f82..4c254ac 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableScheduledExecutorService; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -46,7 +48,7 @@ public class ChildReaper implements Closeable private final Reaper reaper; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final CuratorFramework client; - private final String path; + private final Collection<String> paths = Sets.newConcurrentHashSet(); private final Reaper.Mode mode; private final CloseableScheduledExecutorService executor; private final int reapingThresholdMs; @@ -104,11 +106,11 @@ public class ChildReaper implements Closeable public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath) { this.client = client; - this.path = PathUtils.validatePath(path); this.mode = mode; this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs; this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath); + addPath(path); } /** @@ -148,6 +150,26 @@ public class ChildReaper implements Closeable } } + /** + * Add a path to reap children from + * + * @param path the path + */ + public void addPath(String path) + { + paths.add(PathUtils.validatePath(path)); + } + + /** + * Remove a path from reaping + * + * @param path the path + */ + public void removePath(String path) + { + paths.remove(PathUtils.validatePath(path)); + } + private static ScheduledExecutorService newExecutorService() { return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper"); @@ -155,22 +177,25 @@ public class ChildReaper implements Closeable private void doWork() { - try + for ( String path : paths ) { - List<String> children = client.getChildren().forPath(path); - for ( String name : children ) + try { - String thisPath = ZKPaths.makePath(path, name); - Stat stat = client.checkExists().forPath(thisPath); - if ( (stat != null) && (stat.getNumChildren() == 0) ) + List<String> children = client.getChildren().forPath(path); + for ( String name : children ) { - reaper.addPath(thisPath, mode); + String thisPath = ZKPaths.makePath(path, name); + Stat stat = client.checkExists().forPath(thisPath); + if ( (stat != null) && (stat.getNumChildren() == 0) ) + { + reaper.addPath(thisPath, mode); + } } } - } - catch ( Exception e ) - { - log.error("Could not get children for path: " + path, e); + catch ( Exception e ) + { + log.error("Could not get children for path: " + path, e); + } } } } http://git-wip-us.apache.org/repos/asf/curator/blob/915d66f9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java index 309bd99..ad6ba6c 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java @@ -100,6 +100,43 @@ public class TestChildReaper extends BaseClassForTests } @Test + public void testMultiPath() throws Exception + { + Timing timing = new Timing(); + ChildReaper reaper = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + try + { + client.start(); + + for ( int i = 0; i < 10; ++i ) + { + client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i)); + client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i)); + client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i)); + } + + reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1); + reaper.start(); + reaper.addPath("/test1"); + + timing.forWaiting().sleepABit(); + + Stat stat = client.checkExists().forPath("/test1"); + Assert.assertEquals(stat.getNumChildren(), 0); + stat = client.checkExists().forPath("/test2"); + Assert.assertEquals(stat.getNumChildren(), 0); + stat = client.checkExists().forPath("/test3"); + Assert.assertEquals(stat.getNumChildren(), 10); + } + finally + { + CloseableUtils.closeQuietly(reaper); + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testNamespace() throws Exception { Timing timing = new Timing();
