This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 1027c2cd CURATOR-696. Fix double leader for LeaderLatch (#500)
1027c2cd is described below

commit 1027c2cdbbbac61436b8235b2be1729af4146d17
Author: tison <[email protected]>
AuthorDate: Tue May 21 22:59:34 2024 +0800

    CURATOR-696. Fix double leader for LeaderLatch (#500)
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: Kezhu Wang <[email protected]>
---
 curator-recipes/pom.xml                            |  6 ++
 .../framework/recipes/leader/LeaderLatch.java      | 78 +++++++++++++---------
 .../framework/recipes/leader/TestLeaderLatch.java  | 65 +++++++++++++++++-
 3 files changed, 116 insertions(+), 33 deletions(-)

diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index 927af0c2..1ac5a8b7 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -83,6 +83,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 80509dbf..4d20f9af 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -509,7 +509,7 @@ public class LeaderLatch implements Closeable {
                         getChildren();
                     }
                 } else {
-                    log.error("getChildren() failed. rc = " + 
event.getResultCode());
+                    log.error("getChildren() failed. rc = {}", 
event.getResultCode());
                 }
             }
         };
@@ -548,43 +548,57 @@ public class LeaderLatch implements Closeable {
         log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", 
id, localOurPath, sortedChildren);
 
         if (ourIndex < 0) {
-            log.error("Can't find our node. Resetting. Index: " + ourIndex);
+            log.error("Can't find our node. Resetting. Index: {}", ourIndex);
             reset();
-        } else if (ourIndex == 0) {
-            lastPathIsLeader.set(localOurPath);
-            setLeadership(true);
-        } else {
-            setLeadership(false);
-            String watchPath = sortedChildren.get(ourIndex - 1);
-            Watcher watcher = new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    if (state.get() == State.STARTED && event.getType() == 
Event.EventType.NodeDeleted) {
-                        try {
-                            getChildren();
-                        } catch (Exception ex) {
-                            ThreadUtils.checkInterrupted(ex);
-                            log.error("An error occurred checking the 
leadership.", ex);
+            return;
+        }
+
+        if (ourIndex == 0) {
+            client.getData()
+                    .inBackground((client, event) -> {
+                        final long ephemeralOwner =
+                                event.getStat() != null ? 
event.getStat().getEphemeralOwner() : -1;
+                        final long thisSessionId =
+                                
client.getZookeeperClient().getZooKeeper().getSessionId();
+                        if (ephemeralOwner != thisSessionId) {
+                            // this node is gone - reset
+                            reset();
+                        } else {
+                            lastPathIsLeader.set(localOurPath);
+                            setLeadership(true);
                         }
-                    }
-                }
-            };
+                    })
+                    .forPath(localOurPath);
+            return;
+        }
 
-            BackgroundCallback callback = new BackgroundCallback() {
-                @Override
-                public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
-                    if (event.getResultCode() == 
KeeperException.Code.NONODE.intValue()) {
-                        // previous node is gone - retry getChildren
+        setLeadership(false);
+        String watchPath = sortedChildren.get(ourIndex - 1);
+        Watcher watcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (state.get() == State.STARTED && event.getType() == 
Event.EventType.NodeDeleted) {
+                    try {
                         getChildren();
+                    } catch (Exception ex) {
+                        ThreadUtils.checkInterrupted(ex);
+                        log.error("An error occurred checking the 
leadership.", ex);
                     }
                 }
-            };
-            // use getData() instead of exists() to avoid leaving unneeded 
watchers which is a type of resource leak
-            client.getData()
-                    .usingWatcher(watcher)
-                    .inBackground(callback)
-                    .forPath(ZKPaths.makePath(latchPath, watchPath));
-        }
+            }
+        };
+
+        BackgroundCallback callback = new BackgroundCallback() {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception {
+                if (event.getResultCode() == 
KeeperException.Code.NONODE.intValue()) {
+                    // previous node is gone - retry getChildren
+                    getChildren();
+                }
+            }
+        };
+        // use getData() instead of exists() to avoid leaving unneeded 
watchers which is a type of resource leak
+        
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath,
 watchPath));
     }
 
     private void getChildren() throws Exception {
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index dc79f666..528b317f 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.leader;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -72,9 +73,12 @@ import org.apache.curator.utils.CloseableUtils;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Tag(CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestLeaderLatch extends BaseClassForTests {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestLeaderLatch.class);
     private static final String PATH_NAME = "/one/two/me";
     private static final int MAX_LOOPS = 5;
 
@@ -208,6 +212,58 @@ public class TestLeaderLatch extends BaseClassForTests {
         }
     }
 
+    @Test
+    public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception 
{
+        final String latchPath = 
"/testSessionInterruptionDoNotCauseBrainSplit";
+        final Timing2 timing = new Timing2();
+        final BlockingQueue<TestEvent> events0 = new LinkedBlockingQueue<>();
+        final BlockingQueue<TestEvent> events1 = new LinkedBlockingQueue<>();
+
+        final List<Closeable> closeableResources = new ArrayList<>();
+        try {
+            final String id0 = "id0";
+            final CuratorFramework client0 = 
createAndStartClient(server.getConnectString(), timing, id0, null);
+            closeableResources.add(client0);
+            final LeaderLatch latch0 = createAndStartLeaderLatch(client0, 
latchPath, id0, events0);
+            closeableResources.add(latch0);
+
+            assertThat(events0.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS))
+                    .isNotNull()
+                    .isEqualTo(new TestEvent(id0, 
TestEventType.GAINED_LEADERSHIP));
+
+            final String id1 = "id1";
+            final CuratorFramework client1 = 
createAndStartClient(server.getConnectString(), timing, id1, null);
+            closeableResources.add(client1);
+            final LeaderLatch latch1 = createAndStartLeaderLatch(client1, 
latchPath, id1, events1);
+            closeableResources.add(latch1);
+
+            // wait for the non-leading LeaderLatch (i.e. latch1) instance to 
be done with its creation
+            // this call is time-consuming but necessary because we don't have 
a handle to detect the end of the reset
+            // call
+            timing.forWaiting().sleepABit();
+
+            assertTrue(latch0.hasLeadership());
+            assertFalse(latch1.hasLeadership());
+
+            
client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
+
+            assertThat(events1.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS))
+                    .isNotNull()
+                    .isEqualTo(new TestEvent(id1, 
TestEventType.GAINED_LEADERSHIP));
+
+            assertThat(events0.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS))
+                    .isNotNull()
+                    .isEqualTo(new TestEvent(id0, 
TestEventType.LOST_LEADERSHIP));
+            // No leadership grained to old leader after session changed, 
hence no brain split.
+            assertThat(events0.poll(20, TimeUnit.MILLISECONDS))
+                    .isNotEqualTo(new TestEvent(id0, 
TestEventType.GAINED_LEADERSHIP));
+        } finally {
+            // reverse is necessary for closing the LeaderLatch instances 
before closing the corresponding client
+            Collections.reverse(closeableResources);
+            closeableResources.forEach(CloseableUtils::closeQuietly);
+        }
+    }
+
     @Test
     public void testResettingOfLeadershipAfterConcurrentLeadershipChange() 
throws Exception {
         final String latchPath = "/test";
@@ -316,7 +372,9 @@ public class TestLeaderLatch extends BaseClassForTests {
 
         client.getConnectionStateListenable().addListener((client1, newState) 
-> {
             if (newState == ConnectionState.CONNECTED) {
-                events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
+                if (events != null) {
+                    events.add(new TestEvent(id, 
TestEventType.GAINED_CONNECTION));
+                }
             }
         });
 
@@ -366,6 +424,11 @@ public class TestLeaderLatch extends BaseClassForTests {
             TestEvent testEvent = (TestEvent) o;
             return Objects.equals(id, testEvent.id) && eventType == 
testEvent.eventType;
         }
+
+        @Override
+        public String toString() {
+            return "TestEvent{" + "eventType=" + eventType + ", id='" + id + 
'\'' + '}';
+        }
     }
 
     @Test

Reply via email to