Repository: curator
Updated Branches:
  refs/heads/master 9f108285d -> a26872775


CURATOR-73 - add additional clearIsQueued() for safety


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a2687277
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a2687277
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a2687277

Branch: refs/heads/master
Commit: a26872775f4f86d7c575b48623ca47e1115e57ea
Parents: 9f10828
Author: randgalt <randg...@apache.org>
Authored: Sun Feb 23 15:35:50 2014 +0530
Committer: randgalt <randg...@apache.org>
Committed: Sun Feb 23 15:35:50 2014 +0530

----------------------------------------------------------------------
 .../recipes/leader/LeaderSelector.java          | 33 +++++++-----
 .../recipes/leader/TestLeaderSelector.java      | 57 ++++++++++++++------
 2 files changed, 59 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a2687277/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index 6a15f23..ac10733 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -117,20 +117,21 @@ public class LeaderSelector implements Closeable
     }
 
     /**
-     * @param client the client
-     * @param leaderPath the path for this leadership group
+     * @param client          the client
+     * @param leaderPath      the path for this leadership group
      * @param executorService thread pool to use
-     * @param listener listener
+     * @param listener        listener
      */
-    public LeaderSelector(CuratorFramework client, String leaderPath, 
ExecutorService executorService, LeaderSelectorListener listener) {
+    public LeaderSelector(CuratorFramework client, String leaderPath, 
ExecutorService executorService, LeaderSelectorListener listener)
+    {
         this(client, leaderPath, new 
CloseableExecutorService(executorService), listener);
     }
 
     /**
-     * @param client the client
-     * @param leaderPath the path for this leadership group
+     * @param client          the client
+     * @param leaderPath      the path for this leadership group
      * @param executorService thread pool to use
-     * @param listener listener
+     * @param listener        listener
      */
     public LeaderSelector(CuratorFramework client, String leaderPath, 
CloseableExecutorService executorService, LeaderSelectorListener listener)
     {
@@ -230,18 +231,22 @@ public class LeaderSelector implements Closeable
         if ( !isQueued )
         {
             isQueued = true;
-            Future<Void> task = executorService.submit
-            (
-                new Callable<Void>()
+            Future<Void> task = executorService.submit(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
                 {
-                    @Override
-                    public Void call() throws Exception
+                    try
                     {
                         doWorkLoop();
-                        return null;
                     }
+                    finally
+                    {
+                        clearIsQueued();
+                    }
+                    return null;
                 }
-            );
+            });
             ourTask.set(task);
 
             return true;

http://git-wip-us.apache.org/repos/asf/curator/blob/a2687277/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index bfc8c97..1ae041b 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -20,7 +20,6 @@
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -29,6 +28,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.internal.annotations.Sets;
@@ -41,6 +41,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.testng.Assert.fail;
+
 public class TestLeaderSelector extends BaseClassForTests
 {
     private static final String PATH_NAME = "/one/two/me";
@@ -275,7 +277,7 @@ public class TestLeaderSelector extends BaseClassForTests
     }
 
     @Test
-    public void testKillSession() throws Exception
+    public void testKillSessionThenCloseShouldElectNewLeader() throws Exception
     {
         final Timing timing = new Timing();
 
@@ -286,7 +288,7 @@ public class TestLeaderSelector extends BaseClassForTests
             final Semaphore semaphore = new Semaphore(0);
             final CountDownLatch interruptedLatch = new CountDownLatch(1);
             final AtomicInteger leaderCount = new AtomicInteger(0);
-            LeaderSelectorListener listener = new LeaderSelectorListener()
+            LeaderSelectorListener listener = new 
LeaderSelectorListenerAdapter()
             {
                 @Override
                 public void takeLeadership(CuratorFramework client) throws 
Exception
@@ -297,7 +299,7 @@ public class TestLeaderSelector extends BaseClassForTests
                         semaphore.release();
                         try
                         {
-                            Thread.sleep(1000000);
+                            Thread.currentThread().join();
                         }
                         catch ( InterruptedException e )
                         {
@@ -310,19 +312,13 @@ public class TestLeaderSelector extends BaseClassForTests
                         leaderCount.decrementAndGet();
                     }
                 }
-
-                @Override
-                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
-                {
-                    if ( newState == ConnectionState.LOST )
-                    {
-                        throw new CancelLeadershipException();
-                    }
-                }
             };
             LeaderSelector leaderSelector1 = new LeaderSelector(client, 
PATH_NAME, listener);
             LeaderSelector leaderSelector2 = new LeaderSelector(client, 
PATH_NAME, listener);
 
+            boolean leaderSelector1Closed = false;
+            boolean leaderSelector2Closed = false;
+
             leaderSelector1.start();
             leaderSelector2.start();
 
@@ -333,14 +329,41 @@ public class TestLeaderSelector extends BaseClassForTests
             Assert.assertTrue(timing.awaitLatch(interruptedLatch));
             timing.sleepABit();
 
-            leaderSelector1.requeue();
-            leaderSelector2.requeue();
+            boolean requeued1 = leaderSelector1.requeue();
+            boolean requeued2 = leaderSelector2.requeue();
+            Assert.assertTrue(requeued1);
+            Assert.assertTrue(requeued2);
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
             Assert.assertEquals(leaderCount.get(), 1);
 
-            leaderSelector1.close();
-            leaderSelector2.close();
+            if ( leaderSelector1.hasLeadership() )
+            {
+                leaderSelector1.close();
+                leaderSelector1Closed = true;
+            }
+            else if ( leaderSelector2.hasLeadership() )
+            {
+                leaderSelector2.close();
+                leaderSelector2Closed = true;
+            }
+            else
+            {
+                fail("No leaderselector has leadership!");
+            }
+
+            // Verify that the other leader took over leadership.
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+            Assert.assertEquals(leaderCount.get(), 1);
+
+            if ( !leaderSelector1Closed )
+            {
+                leaderSelector1.close();
+            }
+            if ( !leaderSelector2Closed )
+            {
+                leaderSelector2.close();
+            }
         }
         finally
         {

Reply via email to