This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 9b3a1453 [CURATOR-653] Proposed changes based on PR #398 (#436)
9b3a1453 is described below
commit 9b3a145372d9a8bc9a71adb630a1ad9b71fd2889
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Oct 18 12:08:03 2022 +0200
[CURATOR-653] Proposed changes based on PR #398 (#436)
Co-authored-by: shixiaoxiao <[email protected]>
Co-authored-by: tison <[email protected]>
---
.../framework/recipes/leader/LeaderLatch.java | 9 +-
.../framework/recipes/leader/TestLeaderLatch.java | 151 +++++++++++++++++++++
2 files changed, 159 insertions(+), 1 deletion(-)
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index e8187cec..553e5070 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -540,10 +540,17 @@ public class LeaderLatch implements Closeable
@VisibleForTesting
volatile CountDownLatch debugResetWaitLatch = null;
+ @VisibleForTesting
+ volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;
+
@VisibleForTesting
void reset() throws Exception
{
setLeadership(false);
+ if ( debugResetWaitBeforeNodeDeleteLatch != null )
+ {
+ debugResetWaitBeforeNodeDeleteLatch.await();
+ }
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
@@ -623,6 +630,7 @@ public class LeaderLatch implements Closeable
}
else
{
+ setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@@ -726,7 +734,6 @@ public class LeaderLatch implements Closeable
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);
-
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
listeners.forEach(LeaderLatchListener::notLeader);
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 671e3c4b..69deb207 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -29,7 +29,11 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
@@ -220,6 +224,153 @@ public class TestLeaderLatch extends BaseClassForTests
}
}
+ @Test
+ public void testResettingOfLeadershipAfterConcurrentLeadershipChange()
throws Exception
+ {
+ final String latchPath = "/test";
+ final Timing2 timing = new Timing2();
+ final BlockingQueue<TestEvent> events =
Queues.newLinkedBlockingQueue();
+
+ final List<Closeable> closeableResources = new ArrayList<>();
+ try
+ {
+ final String id0 = "id0";
+ final CuratorFramework client0 =
createAndStartClient(server.getConnectString(), timing, id0, events);
+ closeableResources.add(client0);
+ final LeaderLatch latch0 = createAndStartLeaderLatch(client0,
latchPath, id0, events);
+ closeableResources.add(latch0);
+
+ assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION),
events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP),
events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ final String id1 = "id1";
+ final CuratorFramework client1 =
createAndStartClient(server.getConnectString(), timing, id1, events);
+ closeableResources.add(client1);
+ final LeaderLatch latch1 = createAndStartLeaderLatch(client1,
latchPath, id1, events);
+ closeableResources.add(latch1);
+
+ assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION),
events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ // wait for the non-leading LeaderLatch (i.e. latch1) instance to
be done with its creation
+ // this call is time-consuming but necessary because we don't have
a handle to detect the end of the reset call
+ timing.forWaiting().sleepABit();
+
+ assertTrue(latch0.hasLeadership());
+ assertFalse(latch1.hasLeadership());
+
+ latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1);
+ latch1.debugResetWaitLatch = new CountDownLatch(1);
+ latch0.debugResetWaitLatch = new CountDownLatch(1);
+
+ // force latch0 and latch1 reset to trigger the actual test
+ latch0.reset();
+ // latch1 needs to be called within a separate thread since it's
going to be blocked by the CountDownLatch outside an async call
+ ForkJoinPool.commonPool().submit(() -> {
+ latch1.reset();
+ return null;
+ });
+
+ // latch0.reset() will result in it losing its leadership,
deleting its old child node and creating a new child node before being blocked
by its debugResetWaitLatch
+ assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP),
events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ // latch1.reset() is blocked but latch1 will gain leadership due
its node watching latch0's node to be deleted
+ assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP),
events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ assertFalse(latch0.hasLeadership());
+ assertTrue(latch1.hasLeadership());
+
+ // latch0.reset() continues with the getChildren call, finds
itself not being the leader and starts listening to the node created by latch1
+ latch0.debugResetWaitLatch.countDown();
+ timing.sleepABit();
+
+ // latch1.reset() continues, deletes its old child node and
creates a new child node before being blocked by its debugResetWaitLatch
+ latch1.debugResetWaitBeforeNodeDeleteLatch.countDown();
+
+ // latch0 receives NodeDeleteEvent and then finds itself to be the
leader
+ assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP),
events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ assertTrue(latch0.hasLeadership());
+
+ // latch1.reset() continues and finds itself not being the leader
+ latch1.debugResetWaitLatch.countDown();
+ // this call is time-consuming but necessary because we don't have
a handle to detect the end of the reset call
+ timing.forWaiting().sleepABit();
+
+ assertTrue(latch0.hasLeadership());
+ assertFalse(latch1.hasLeadership());
+ }
+ finally
+ {
+ // reverse is necessary for closing the LeaderLatch instances
before closing the corresponding client
+ Collections.reverse(closeableResources);
+ closeableResources.forEach(CloseableUtils::closeQuietly);
+ }
+ }
+
+ private static CuratorFramework createAndStartClient(String
zkConnectString, Timing2 timing, String id, Collection<TestEvent> events) {
+ final CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkConnectString)
+ .connectionTimeoutMs(timing.connection())
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+
+ client.getConnectionStateListenable().addListener((client1, newState)
-> {
+ if ( newState == ConnectionState.CONNECTED )
+ {
+ events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
+ }
+ });
+
+ client.start();
+
+ return client;
+ }
+
+ private static LeaderLatch createAndStartLeaderLatch(CuratorFramework
client, String latchPath, String id, Collection<TestEvent> events) throws
Exception
+ {
+ final LeaderLatch latch = new LeaderLatch(client, latchPath, id);
+ latch.addListener(new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ events.add(new TestEvent(latch.getId(),
TestEventType.GAINED_LEADERSHIP));
+ }
+
+ @Override
+ public void notLeader() {
+ events.add(new TestEvent(latch.getId(),
TestEventType.LOST_LEADERSHIP));
+ }
+ });
+ latch.start();
+
+ return latch;
+ }
+
+ private enum TestEventType
+ {
+ GAINED_LEADERSHIP,
+ LOST_LEADERSHIP,
+ GAINED_CONNECTION;
+ }
+
+ private static class TestEvent {
+ private final String id;
+ private final TestEventType eventType;
+
+ public TestEvent(String id, TestEventType eventType)
+ {
+ this.id = id;
+ this.eventType = eventType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TestEvent testEvent = (TestEvent) o;
+ return Objects.equals(id, testEvent.id) && eventType ==
testEvent.eventType;
+ }
+ }
+
@Test
public void
testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws
Exception
{