Repository: curator Updated Branches: refs/heads/CURATOR-187 [created] 49eb02a04
Adding a new constructor to Reaper so that it can optionally take a fully constructed leader latch that is owned by another class rather than create its own leader latch Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/520ae54a Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/520ae54a Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/520ae54a Branch: refs/heads/CURATOR-187 Commit: 520ae54ac4a49292201417fa6b1104cf579704d3 Parents: febfcec Author: David Kesler <[email protected]> Authored: Mon Feb 9 13:19:20 2015 -0500 Committer: David Kesler <[email protected]> Committed: Mon Feb 9 13:19:20 2015 -0500 ---------------------------------------------------------------------- .../curator/framework/recipes/locks/Reaper.java | 62 +++++++++++--- .../framework/recipes/locks/TestReaper.java | 90 +++++++++++++++++++- 2 files changed, 137 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/520ae54a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java index 8802372..660e3d3 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java @@ -52,6 +52,7 @@ public class Reaper implements Closeable private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final LeaderLatch leaderLatch; private final AtomicBoolean reapingIsActive = new AtomicBoolean(true); + private final boolean ownsLeaderLatch; private enum State { @@ -111,7 +112,7 @@ public class Reaper implements Closeable */ public Reaper(CuratorFramework client) { - this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null); + this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, (String) null); } /** @@ -122,7 +123,7 @@ public class Reaper implements Closeable */ public Reaper(CuratorFramework client, int reapingThresholdMs) { - this(client, newExecutorService(), reapingThresholdMs, null); + this(client, newExecutorService(), reapingThresholdMs, (String) null); } /** @@ -132,7 +133,7 @@ public class Reaper implements Closeable */ public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs) { - this(client, executor, reapingThresholdMs, null); + this(client, executor, reapingThresholdMs, (String) null); } /** @@ -143,18 +144,41 @@ public class Reaper implements Closeable */ public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath) { + this(client, executor, reapingThresholdMs, makeLeaderLatchIfPathNotNull(client, leaderPath), true); + } + + /** + * @param client client + * @param executor thread pool + * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted + * @param leaderLatch a pre-created leader latch to ensure only 1 reaper is active in the cluster + */ + public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch) + { + this(client, executor, reapingThresholdMs, leaderLatch, false); + } + + /** + * @param client client + * @param executor thread pool + * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted + * @param leaderLatch a pre-created leader latch to ensure only 1 reaper is active in the cluster + * @param ownsLeaderLatch indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it + * */ + private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch) + { this.client = client; this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD; - - LeaderLatch localLeaderLatch = null; - if ( leaderPath != null ) + this.leaderLatch = leaderLatch; + if (leaderLatch != null) { - localLeaderLatch = makeLeaderLatch(client, leaderPath); + addListenerToLeaderLatch(leaderLatch); } - leaderLatch = localLeaderLatch; + this.ownsLeaderLatch = ownsLeaderLatch; } + /** * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. The path will be checked periodically * until the reaper is closed. @@ -200,7 +224,7 @@ public class Reaper implements Closeable { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - if ( leaderLatch != null ) + if ( leaderLatch != null && ownsLeaderLatch) { leaderLatch.start(); } @@ -212,7 +236,7 @@ public class Reaper implements Closeable if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { executor.close(); - if ( leaderLatch != null ) + if ( leaderLatch != null && ownsLeaderLatch ) { leaderLatch.close(); } @@ -310,11 +334,10 @@ public class Reaper implements Closeable return ThreadUtils.newSingleThreadScheduledExecutor("Reaper"); } - private LeaderLatch makeLeaderLatch(CuratorFramework client, String leaderPath) + private void addListenerToLeaderLatch(LeaderLatch leaderLatch) { reapingIsActive.set(false); - LeaderLatch localLeaderLatch = new LeaderLatch(client, leaderPath); LeaderLatchListener listener = new LeaderLatchListener() { @Override @@ -333,7 +356,18 @@ public class Reaper implements Closeable reapingIsActive.set(false); } }; - localLeaderLatch.addListener(listener); - return localLeaderLatch; + leaderLatch.addListener(listener); + } + + private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath) + { + if (leaderPath == null) + { + return null; + } + else + { + return new LeaderLatch(client, leaderPath); + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/520ae54a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java index 83ec960..c47808f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; @@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestReaper extends BaseClassForTests { @Test - public void testUsingLeader() throws Exception + public void testUsingLeaderPath() throws Exception { final Timing timing = new Timing(); CuratorFramework client = makeClient(timing, null); @@ -118,6 +119,93 @@ public class TestReaper extends BaseClassForTests } @Test + public void testUsingLeaderLatch() throws Exception + { + final Timing timing = new Timing(); + CuratorFramework client = makeClient(timing, null); + Reaper reaper1 = null; + Reaper reaper2 = null; + LeaderLatch leaderLatch1 = null; + LeaderLatch leaderLatch2 = null; + try + { + final AtomicInteger reaper1Count = new AtomicInteger(); + leaderLatch1 = new LeaderLatch(client, "/reaper/leader"); + reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch1) + { + @Override + protected void reap(PathHolder holder) + { + reaper1Count.incrementAndGet(); + super.reap(holder); + } + }; + + final AtomicInteger reaper2Count = new AtomicInteger(); + leaderLatch2 = new LeaderLatch(client, "/reaper/leader"); + reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch2) + { + @Override + protected void reap(PathHolder holder) + { + reaper2Count.incrementAndGet(); + super.reap(holder); + } + }; + + client.start(); + client.create().creatingParentsIfNeeded().forPath("/one/two/three"); + + leaderLatch1.start(); + leaderLatch2.start(); + + reaper1.start(); + reaper2.start(); + + reaper1.addPath("/one/two/three"); + reaper2.addPath("/one/two/three"); + + timing.sleepABit(); + + Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0)); + Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0)); + + Reaper activeReaper; + LeaderLatch activeLeaderLeatch; + AtomicInteger inActiveReaperCount; + if ( reaper1Count.get() > 0 ) + { + activeReaper = reaper1; + activeLeaderLeatch = leaderLatch1; + inActiveReaperCount = reaper2Count; + } + else + { + activeReaper = reaper2; + activeLeaderLeatch = leaderLatch2; + inActiveReaperCount = reaper1Count; + } + Assert.assertEquals(inActiveReaperCount.get(), 0); + activeReaper.close(); + activeLeaderLeatch.close(); + timing.sleepABit(); + Assert.assertTrue(inActiveReaperCount.get() > 0); + } + finally + { + CloseableUtils.closeQuietly(reaper1); + CloseableUtils.closeQuietly(reaper2); + if (leaderLatch1 != null && LeaderLatch.State.STARTED == leaderLatch1.getState()) { + CloseableUtils.closeQuietly(leaderLatch1); + } + if (leaderLatch2 != null && LeaderLatch.State.STARTED == leaderLatch2.getState()) { + CloseableUtils.closeQuietly(leaderLatch2); + } + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testUsingManualLeader() throws Exception { final Timing timing = new Timing();
