This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 4eacde3312265e5184708f59641451bccb649ca0
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Sat Jul 22 22:39:37 2023 -0700

    Metaclient - Leader election - leader change event notification (#2560)
    
    Co-authored-by: Xiaoyuan Lu <xi...@xialu-mn2.linkedin.biz>
---
 .../impl/zk/adapter/DataListenerAdapter.java       |   1 +
 .../leaderelection/LeaderElectionClient.java       |   4 +-
 .../LeaderElectionListenerInterface.java           |  15 ++-
 .../LeaderElectionListenerInterfaceAdapter.java    |  43 +++++++
 .../zk/TestConnectStateChangeListenerAndRetry.java |   3 +-
 .../recipes/leaderelection/TestLeaderElection.java | 135 ++++++++++++++++++++-
 .../helix/zookeeper/zkclient/ZkConnection.java     |   1 -
 7 files changed, 193 insertions(+), 9 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
index 94ae198ce..748b6ed3f 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
@@ -21,6 +21,7 @@ package org.apache.helix.metaclient.impl.zk.adapter;
 
 import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.annotation.PreFetchChangedData;
 import org.apache.zookeeper.Watcher;
 
 
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index 39233e979..7b13778c0 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -334,7 +334,8 @@ public class LeaderElectionClient implements AutoCloseable {
    * @return A boolean value indicating if registration is success.
    */
   public boolean subscribeLeadershipChanges(String leaderPath, 
LeaderElectionListenerInterface listener) {
-    //TODO: add converter class for LeaderElectionListenerInterface
+    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new 
LeaderElectionListenerInterfaceAdapter(listener),
+        false);
     return false;
   }
 
@@ -343,6 +344,7 @@ public class LeaderElectionClient implements AutoCloseable {
    * @param listener An implementation of LeaderElectionListenerInterface
    */
   public void unsubscribeLeadershipChanges(String leaderPath, 
LeaderElectionListenerInterface listener) {
+    _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new 
LeaderElectionListenerInterfaceAdapter(listener));
   }
 
   @Override
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
index 0436e1eb0..230fc2af1 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
@@ -24,9 +24,16 @@ package org.apache.helix.metaclient.recipes.leaderelection;
  * leader node is deleted.
  */
 public interface LeaderElectionListenerInterface {
+  enum ChangeType {
+    LEADER_ACQUIRED,
+    LEADER_LOST
+  }
+
   // When new leader is elected:
-  //                             noLeader (null)                  ->    has 
leader (new leader name)
-  // When existing leader not leader anymore:
-  //                             has Leader (prevleader name)     ->    no 
leader (null)
-  public void onLeadershipChange(String leaderPath, String prevLeader, String 
curLeader);
+  //                          ChangeType == NEW_LEADER_ELECTED, curLeader is 
the new leader name
+  // When no leader anymore:
+  //                         ChangeType == LEADER_GONE, curLeader is an empty 
string
+  // In ZK implementation, since notification does not include changed data 
and metaclient fetches
+  // the entry when event comes, it is possible that
+  public void onLeadershipChange(String leaderPath, ChangeType type,  String 
curLeader);
 }
\ No newline at end of file
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
new file mode 100644
index 000000000..5c64d6790
--- /dev/null
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
@@ -0,0 +1,43 @@
+package org.apache.helix.metaclient.recipes.leaderelection;
+
+import org.apache.helix.metaclient.api.DataChangeListener;
+
+import static 
org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*;
+
+
+public class LeaderElectionListenerInterfaceAdapter implements 
DataChangeListener {
+  private final LeaderElectionListenerInterface _leaderElectionListener;
+
+  public 
LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface 
leaderElectionListener) {
+    _leaderElectionListener = leaderElectionListener;
+  }
+
+  @Override
+  public void handleDataChange(String key, Object data, ChangeType changeType) 
throws Exception {
+    switch (changeType) {
+      case  ENTRY_CREATED:
+        String newLeader = ((LeaderInfo) data).getLeaderName();
+        _leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED, 
newLeader);
+        break;
+      case ENTRY_DELETED:
+        _leaderElectionListener.onLeadershipChange(key, LEADER_LOST, "");
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LeaderElectionListenerInterfaceAdapter that = 
(LeaderElectionListenerInterfaceAdapter) o;
+    return _leaderElectionListener.equals(that._leaderElectionListener);
+  }
+
+  @Override
+  public int hashCode() {
+    return _leaderElectionListener.hashCode();
+  }
+}
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
index 36b9b2131..c74b7d7ef 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
@@ -162,7 +162,8 @@ public class TestConnectStateChangeListenerAndRetry  {
         zkMetaClient.create("/key", "value");
         Assert.fail("Create call after close should throw 
IllegalStateException");
       } catch (Exception ex) {
-        Assert.assertTrue(ex.getCause() instanceof IllegalStateException);
+        System.out.println("ex " + ex);
+        Assert.assertTrue(ex instanceof IllegalStateException);
       }
     }
     System.out.println("END 
TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + 
new Date(System.currentTimeMillis()));
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 412ecaf8b..b0b396c1a 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -2,6 +2,8 @@ package org.apache.helix.metaclient.recipes.leaderelection;
 
 import java.util.ConcurrentModificationException;
 import org.apache.helix.metaclient.MetaClientTestUtil;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
@@ -26,12 +28,14 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
 
   @Test
   public void testAcquireLeadership() throws Exception {
+    String leaderPath = LEADER_PATH + "testAcquireLeadership";
+
     // create 2 clients representing 2 participants
     LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
     LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
 
-    clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
-    clt2.joinLeaderElectionParticipantPool(LEADER_PATH);
+    clt1.joinLeaderElectionParticipantPool(leaderPath);
+    clt2.joinLeaderElectionParticipantPool(leaderPath);
     // First client joining the leader election group should be current leader
     Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(LEADER_PATH) != null);
@@ -139,4 +143,131 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
     Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
   }
+  @Test (dependsOnMethods = "testAcquireLeadership")
+  public void testLeadershipListener() throws Exception {
+    String leaderPath = LEADER_PATH + "testLeadershipListener";
+    // create 2 clients representing 2 participants
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+    LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    final int count = 10;
+    final int[] numNewLeaderEvent = {0};
+    final int[] numLeaderGoneEvent = {0};
+    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count*2);
+    CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count*2);
+
+     LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
+
+       @Override
+       public void onLeadershipChange(String leaderPath, ChangeType type, 
String curLeader) {
+         if (type == ChangeType.LEADER_LOST) {
+           countDownLatchLeaderGone.countDown();
+           Assert.assertEquals(curLeader.length(), 0);
+           numLeaderGoneEvent[0]++;
+         } else if (type == ChangeType.LEADER_ACQUIRED) {
+           countDownLatchNewLeader.countDown();
+           numNewLeaderEvent[0]++;
+           Assert.assertTrue(curLeader.length()!=0);
+         } else {
+           Assert.fail();
+         }
+       }
+     };
+
+    clt3.subscribeLeadershipChanges(leaderPath, listener);
+
+    // each iteration will be participant_1 is new leader, leader gone, 
participant_2 is new leader, leader gone
+    for (int i=0; i<count; ++i) {
+      joinPoolTestHelper(leaderPath, clt1, clt2);
+      Thread.sleep(1000);
+    }
+
+    
Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
+    
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
+    Assert.assertEquals(numLeaderGoneEvent[0], count*2);
+    Assert.assertEquals(numNewLeaderEvent[0], count*2);
+
+    clt3.unsubscribeLeadershipChanges(leaderPath, listener);
+
+    // listener shouldn't receive any event after unsubscribe
+    joinPoolTestHelper(leaderPath, clt1, clt2);
+    Assert.assertEquals(numLeaderGoneEvent[0], count*2);
+    Assert.assertEquals(numNewLeaderEvent[0], count*2);
+
+    clt1.close();
+    clt2.close();
+    clt3.close();
+  }
+
+  private void joinPoolTestHelper(String leaderPath, LeaderElectionClient 
clt1, LeaderElectionClient clt2) throws Exception {
+    clt1.joinLeaderElectionParticipantPool(leaderPath);
+    clt2.joinLeaderElectionParticipantPool(leaderPath);
+
+    Thread.sleep(2000);
+
+    // clt1 gone
+    clt1.exitLeaderElectionParticipantPool(leaderPath);
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+    clt2.exitLeaderElectionParticipantPool(leaderPath);
+
+  }
+
+  @Test (dependsOnMethods = "testLeadershipListener")
+  public void testRelinquishLeadership() throws Exception {
+    String leaderPath = LEADER_PATH + "testRelinquishLeadership";
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+    LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+
+    final int count = 1;
+    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count);
+    CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count);
+
+    LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
+
+      @Override
+      public void onLeadershipChange(String leaderPath, ChangeType type, 
String curLeader) {
+        if (type == ChangeType.LEADER_LOST) {
+          countDownLatchLeaderGone.countDown();
+          Assert.assertEquals(curLeader.length(), 0);
+        } else if (type == ChangeType.LEADER_ACQUIRED) {
+          countDownLatchNewLeader.countDown();
+          Assert.assertTrue(curLeader.length()!=0);
+        } else {
+          Assert.fail();
+        }
+      }
+    };
+
+    clt1.joinLeaderElectionParticipantPool(leaderPath);
+    clt2.joinLeaderElectionParticipantPool(leaderPath);
+    clt3.subscribeLeadershipChanges(leaderPath, listener);
+    // clt1 gone
+    clt1.relinquishLeader(leaderPath);
+
+    // participant 1 should have gone, and a leader gone event is sent
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
+
+    clt1.exitLeaderElectionParticipantPool(leaderPath);
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+    clt2.exitLeaderElectionParticipantPool(leaderPath);
+  }
+
 }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 589425462..376409231 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -260,7 +260,6 @@ public class ZkConnection implements IZkConnection {
   private void lookupGetChildrenMethod() {
     _getChildrenMethod = doLookUpGetChildrenMethod();
 
-    System.out.println(" 
ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED " + 
GETCHILDREN_PAGINATION_DISABLED);
     LOG.info("Pagination config {}={}, method to be invoked: {}",
         ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, 
GETCHILDREN_PAGINATION_DISABLED,
         _getChildrenMethod.getName());

Reply via email to