Repository: curator Updated Branches: refs/heads/master 49eb02a04 -> 67b122da5
Adding the notion of a 'lock schema' to ChildReaper that enables it to reap both the direct children its watching and subnodes of those children. This is necessary with InterProcessSemaphoreV2 as it creates multiple subnodes beneath its lock nodes and otherwise is unreapable with ChildReaper Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72aea4a3 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72aea4a3 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72aea4a3 Branch: refs/heads/master Commit: 72aea4a30b36201fe2a673358c1e062d6b5109a7 Parents: 49eb02a Author: David Kesler <[email protected]> Authored: Mon Feb 9 16:34:20 2015 -0500 Committer: David Kesler <[email protected]> Committed: Mon Feb 9 16:34:20 2015 -0500 ---------------------------------------------------------------------- .../framework/recipes/locks/ChildReaper.java | 35 +++++++++++++++-- .../recipes/locks/InterProcessSemaphoreV2.java | 8 ++++ .../framework/recipes/locks/LockSchema.java | 22 +++++++++++ .../locks/TestInterProcessSemaphore.java | 40 ++++++++++++++++++++ 4 files changed, 101 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java index 56c56ab..7935f0b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java @@ -55,6 +55,7 @@ public class ChildReaper implements Closeable private final CloseableScheduledExecutorService executor; private final int reapingThresholdMs; private final LeaderLatch leaderLatch; + private final LockSchema lockSchema; private volatile Future<?> task; @@ -108,6 +109,21 @@ public class ChildReaper implements Closeable */ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath) { + this(client, path, mode, executor, reapingThresholdMs, leaderPath, new LockSchema()); + } + + + /** + * @param client the client + * @param path path to reap children from + * @param executor executor to use for background tasks + * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted + * @param mode reaping mode + * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster + * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes + */ + public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, LockSchema lockSchema) + { this.client = client; this.mode = mode; this.executor = new CloseableScheduledExecutorService(executor); @@ -121,6 +137,7 @@ public class ChildReaper implements Closeable leaderLatch = null; } this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch); + this.lockSchema = lockSchema; addPath(path); } @@ -207,12 +224,13 @@ public class ChildReaper implements Closeable List<String> children = client.getChildren().forPath(path); for ( String name : children ) { - String thisPath = ZKPaths.makePath(path, name); - Stat stat = client.checkExists().forPath(thisPath); - if ( (stat != null) && (stat.getNumChildren() == 0) ) + String childPath = ZKPaths.makePath(path, name); + addPathToReaperIfEmpty(childPath); + for ( String subNode : lockSchema.getPaths() ) { - reaper.addPath(thisPath, mode); + addPathToReaperIfEmpty(ZKPaths.makePath(childPath, subNode)); } + } } catch ( Exception e ) @@ -223,6 +241,15 @@ public class ChildReaper implements Closeable } } + private void addPathToReaperIfEmpty(String path) throws Exception + { + Stat stat = client.checkExists().forPath(path); + if ( (stat != null) && (stat.getNumChildren() == 0) ) + { + reaper.addPath(path, mode); + } + } + private boolean shouldDoWork() { return this.leaderLatch == null || this.leaderLatch.hasLeadership(); http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java index 2e14ee1..55647ad 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java @@ -21,6 +21,8 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + import org.apache.curator.utils.CloseableUtils; import org.apache.curator.RetryLoop; import org.apache.curator.framework.CuratorFramework; @@ -92,6 +94,12 @@ public class InterProcessSemaphoreV2 private static final String LOCK_PARENT = "locks"; private static final String LEASE_PARENT = "leases"; private static final String LEASE_BASE_NAME = "lease-"; + public static final LockSchema LOCK_SCHEMA = new LockSchema( + Sets.newHashSet( + LOCK_PARENT, + LEASE_PARENT + ) + ); /** * @param client the client http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java new file mode 100644 index 0000000..5794705 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java @@ -0,0 +1,22 @@ +package org.apache.curator.framework.recipes.locks; + +import java.util.HashSet; +import java.util.Set; + +import com.google.common.collect.Sets; + +public class LockSchema { + private final Set<String> paths; + + public LockSchema() { + paths = new HashSet<String>(); + } + + public LockSchema(Set<String> paths) { + this.paths = Sets.newHashSet(paths); + } + + public Set<String> getPaths() { + return Sets.newHashSet(paths); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java index dd3f98f..631b7c7 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java @@ -531,4 +531,44 @@ public class TestInterProcessSemaphore extends BaseClassForTests CloseableUtils.closeQuietly(client); } } + + @Test + public void testChildReaperCleansUpLockNodes() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + + ChildReaper childReaper = null; + try + { + InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1); + semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS)); + + Assert.assertTrue(client.getChildren().forPath("/test").size() > 0); + + childReaper = new ChildReaper( + client, + "/test", + Reaper.Mode.REAP_UNTIL_GONE, + ChildReaper.newExecutorService(), + 1, + "/test-leader", + InterProcessSemaphoreV2.LOCK_SCHEMA + ); + childReaper.start(); + + timing.forWaiting().sleepABit(); + + List<String> children = client.getChildren().forPath("/test"); + + Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped"); + } + finally + { + CloseableUtils.closeQuietly(childReaper); + CloseableUtils.closeQuietly(client); + } + + } }
