Repository: curator Updated Branches: refs/heads/master 9f108285d -> a26872775
CURATOR-73 - add additional clearIsQueued() for safety Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a2687277 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a2687277 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a2687277 Branch: refs/heads/master Commit: a26872775f4f86d7c575b48623ca47e1115e57ea Parents: 9f10828 Author: randgalt <randg...@apache.org> Authored: Sun Feb 23 15:35:50 2014 +0530 Committer: randgalt <randg...@apache.org> Committed: Sun Feb 23 15:35:50 2014 +0530 ---------------------------------------------------------------------- .../recipes/leader/LeaderSelector.java | 33 +++++++----- .../recipes/leader/TestLeaderSelector.java | 57 ++++++++++++++------ 2 files changed, 59 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/a2687277/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java index 6a15f23..ac10733 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java @@ -117,20 +117,21 @@ public class LeaderSelector implements Closeable } /** - * @param client the client - * @param leaderPath the path for this leadership group + * @param client the client + * @param leaderPath the path for this leadership group * @param executorService thread pool to use - * @param listener listener + * @param listener listener */ - public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener) { + public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener) + { this(client, leaderPath, new CloseableExecutorService(executorService), listener); } /** - * @param client the client - * @param leaderPath the path for this leadership group + * @param client the client + * @param leaderPath the path for this leadership group * @param executorService thread pool to use - * @param listener listener + * @param listener listener */ public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) { @@ -230,18 +231,22 @@ public class LeaderSelector implements Closeable if ( !isQueued ) { isQueued = true; - Future<Void> task = executorService.submit - ( - new Callable<Void>() + Future<Void> task = executorService.submit(new Callable<Void>() + { + @Override + public Void call() throws Exception { - @Override - public Void call() throws Exception + try { doWorkLoop(); - return null; } + finally + { + clearIsQueued(); + } + return null; } - ); + }); ourTask.set(task); return true; http://git-wip-us.apache.org/repos/asf/curator/blob/a2687277/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index bfc8c97..1ae041b 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.leader; import com.google.common.collect.Lists; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.BaseClassForTests; @@ -29,6 +28,7 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.KillSession; import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; import org.testng.Assert; import org.testng.annotations.Test; import org.testng.internal.annotations.Sets; @@ -41,6 +41,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.testng.Assert.fail; + public class TestLeaderSelector extends BaseClassForTests { private static final String PATH_NAME = "/one/two/me"; @@ -275,7 +277,7 @@ public class TestLeaderSelector extends BaseClassForTests } @Test - public void testKillSession() throws Exception + public void testKillSessionThenCloseShouldElectNewLeader() throws Exception { final Timing timing = new Timing(); @@ -286,7 +288,7 @@ public class TestLeaderSelector extends BaseClassForTests final Semaphore semaphore = new Semaphore(0); final CountDownLatch interruptedLatch = new CountDownLatch(1); final AtomicInteger leaderCount = new AtomicInteger(0); - LeaderSelectorListener listener = new LeaderSelectorListener() + LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework client) throws Exception @@ -297,7 +299,7 @@ public class TestLeaderSelector extends BaseClassForTests semaphore.release(); try { - Thread.sleep(1000000); + Thread.currentThread().join(); } catch ( InterruptedException e ) { @@ -310,19 +312,13 @@ public class TestLeaderSelector extends BaseClassForTests leaderCount.decrementAndGet(); } } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if ( newState == ConnectionState.LOST ) - { - throw new CancelLeadershipException(); - } - } }; LeaderSelector leaderSelector1 = new LeaderSelector(client, PATH_NAME, listener); LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, listener); + boolean leaderSelector1Closed = false; + boolean leaderSelector2Closed = false; + leaderSelector1.start(); leaderSelector2.start(); @@ -333,14 +329,41 @@ public class TestLeaderSelector extends BaseClassForTests Assert.assertTrue(timing.awaitLatch(interruptedLatch)); timing.sleepABit(); - leaderSelector1.requeue(); - leaderSelector2.requeue(); + boolean requeued1 = leaderSelector1.requeue(); + boolean requeued2 = leaderSelector2.requeue(); + Assert.assertTrue(requeued1); + Assert.assertTrue(requeued2); Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); Assert.assertEquals(leaderCount.get(), 1); - leaderSelector1.close(); - leaderSelector2.close(); + if ( leaderSelector1.hasLeadership() ) + { + leaderSelector1.close(); + leaderSelector1Closed = true; + } + else if ( leaderSelector2.hasLeadership() ) + { + leaderSelector2.close(); + leaderSelector2Closed = true; + } + else + { + fail("No leaderselector has leadership!"); + } + + // Verify that the other leader took over leadership. + Assert.assertTrue(timing.acquireSemaphore(semaphore, 1)); + Assert.assertEquals(leaderCount.get(), 1); + + if ( !leaderSelector1Closed ) + { + leaderSelector1.close(); + } + if ( !leaderSelector2Closed ) + { + leaderSelector2.close(); + } } finally {