Repository: curator
Updated Branches:
  refs/heads/master 49eb02a04 -> 67b122da5


Adding the notion of a 'lock schema' to ChildReaper that enables it to reap 
both the direct children its watching and subnodes of those children.  This is 
necessary with InterProcessSemaphoreV2 as it creates multiple subnodes beneath 
its lock nodes and otherwise is unreapable with ChildReaper


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72aea4a3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72aea4a3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72aea4a3

Branch: refs/heads/master
Commit: 72aea4a30b36201fe2a673358c1e062d6b5109a7
Parents: 49eb02a
Author: David Kesler <[email protected]>
Authored: Mon Feb 9 16:34:20 2015 -0500
Committer: David Kesler <[email protected]>
Committed: Mon Feb 9 16:34:20 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/locks/ChildReaper.java    | 35 +++++++++++++++--
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++++
 .../framework/recipes/locks/LockSchema.java     | 22 +++++++++++
 .../locks/TestInterProcessSemaphore.java        | 40 ++++++++++++++++++++
 4 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 56c56ab..7935f0b 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -55,6 +55,7 @@ public class ChildReaper implements Closeable
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
     private final LeaderLatch leaderLatch;
+    private final LockSchema lockSchema;
 
     private volatile Future<?> task;
 
@@ -108,6 +109,21 @@ public class ChildReaper implements Closeable
      */
     public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, 
ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
     {
+        this(client, path, mode, executor, reapingThresholdMs, leaderPath, new 
LockSchema());
+    }
+
+
+    /**
+     * @param client the client
+     * @param path path to reap children from
+     * @param executor executor to use for background tasks
+     * @param reapingThresholdMs threshold in milliseconds that determines 
that a path can be deleted
+     * @param mode reaping mode
+     * @param leaderPath if not null, uses a leader selection so that only 1 
reaper is active in the cluster
+     * @param lockSchema a set of the possible subnodes of the children of 
path that must be reaped in addition to the child nodes
+     */
+    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, 
ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, 
LockSchema lockSchema)
+    {
         this.client = client;
         this.mode = mode;
         this.executor = new CloseableScheduledExecutorService(executor);
@@ -121,6 +137,7 @@ public class ChildReaper implements Closeable
             leaderLatch = null;
         }
         this.reaper = new Reaper(client, executor, reapingThresholdMs, 
leaderLatch);
+        this.lockSchema = lockSchema;
         addPath(path);
     }
 
@@ -207,12 +224,13 @@ public class ChildReaper implements Closeable
                     List<String> children = client.getChildren().forPath(path);
                     for ( String name : children )
                     {
-                        String thisPath = ZKPaths.makePath(path, name);
-                        Stat stat = client.checkExists().forPath(thisPath);
-                        if ( (stat != null) && (stat.getNumChildren() == 0) )
+                        String childPath = ZKPaths.makePath(path, name);
+                        addPathToReaperIfEmpty(childPath);
+                        for ( String subNode : lockSchema.getPaths() )
                         {
-                            reaper.addPath(thisPath, mode);
+                            addPathToReaperIfEmpty(ZKPaths.makePath(childPath, 
subNode));
                         }
+
                     }
                 }
                 catch ( Exception e )
@@ -223,6 +241,15 @@ public class ChildReaper implements Closeable
         }
     }
 
+    private void addPathToReaperIfEmpty(String path) throws Exception
+    {
+        Stat stat = client.checkExists().forPath(path);
+        if ( (stat != null) && (stat.getNumChildren() == 0) )
+        {
+            reaper.addPath(path, mode);
+        }
+    }
+
     private boolean shouldDoWork()
     {
         return this.leaderLatch == null || this.leaderLatch.hasLeadership();

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 2e14ee1..55647ad 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -21,6 +21,8 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
@@ -92,6 +94,12 @@ public class InterProcessSemaphoreV2
     private static final String LOCK_PARENT = "locks";
     private static final String LEASE_PARENT = "leases";
     private static final String LEASE_BASE_NAME = "lease-";
+    public static final LockSchema LOCK_SCHEMA = new LockSchema(
+            Sets.newHashSet(
+                    LOCK_PARENT,
+                    LEASE_PARENT
+            )
+    );
 
     /**
      * @param client    the client

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
new file mode 100644
index 0000000..5794705
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
@@ -0,0 +1,22 @@
+package org.apache.curator.framework.recipes.locks;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+public class LockSchema {
+    private final Set<String> paths;
+
+    public LockSchema() {
+        paths = new HashSet<String>();
+    }
+
+    public LockSchema(Set<String> paths) {
+        this.paths = Sets.newHashSet(paths);
+    }
+
+    public Set<String> getPaths() {
+        return Sets.newHashSet(paths);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index dd3f98f..631b7c7 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -531,4 +531,44 @@ public class TestInterProcessSemaphore extends 
BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testChildReaperCleansUpLockNodes() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+
+        ChildReaper childReaper = null;
+        try
+        {
+            InterProcessSemaphoreV2 semaphore = new 
InterProcessSemaphoreV2(client, "/test/lock", 1);
+            
semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), 
TimeUnit.SECONDS));
+
+            Assert.assertTrue(client.getChildren().forPath("/test").size() > 
0);
+
+            childReaper = new ChildReaper(
+                    client,
+                    "/test",
+                    Reaper.Mode.REAP_UNTIL_GONE,
+                    ChildReaper.newExecutorService(),
+                    1,
+                    "/test-leader",
+                    InterProcessSemaphoreV2.LOCK_SCHEMA
+            );
+            childReaper.start();
+
+            timing.forWaiting().sleepABit();
+
+            List<String> children = client.getChildren().forPath("/test");
+
+            Assert.assertEquals(children.size(), 0, "All children of /test 
should have been reaped");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(childReaper);
+            CloseableUtils.closeQuietly(client);
+        }
+
+    }
 }

Reply via email to