This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4eacde3312265e5184708f59641451bccb649ca0 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Sat Jul 22 22:39:37 2023 -0700 Metaclient - Leader election - leader change event notification (#2560) Co-authored-by: Xiaoyuan Lu <xi...@xialu-mn2.linkedin.biz> --- .../impl/zk/adapter/DataListenerAdapter.java | 1 + .../leaderelection/LeaderElectionClient.java | 4 +- .../LeaderElectionListenerInterface.java | 15 ++- .../LeaderElectionListenerInterfaceAdapter.java | 43 +++++++ .../zk/TestConnectStateChangeListenerAndRetry.java | 3 +- .../recipes/leaderelection/TestLeaderElection.java | 135 ++++++++++++++++++++- .../helix/zookeeper/zkclient/ZkConnection.java | 1 - 7 files changed, 193 insertions(+), 9 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java index 94ae198ce..748b6ed3f 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java @@ -21,6 +21,7 @@ package org.apache.helix.metaclient.impl.zk.adapter; import org.apache.helix.metaclient.api.DataChangeListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.annotation.PreFetchChangedData; import org.apache.zookeeper.Watcher; diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index 39233e979..7b13778c0 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -334,7 +334,8 @@ public class LeaderElectionClient implements AutoCloseable { * @return A boolean value indicating if registration is success. */ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { - //TODO: add converter class for LeaderElectionListenerInterface + _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new LeaderElectionListenerInterfaceAdapter(listener), + false); return false; } @@ -343,6 +344,7 @@ public class LeaderElectionClient implements AutoCloseable { * @param listener An implementation of LeaderElectionListenerInterface */ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { + _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new LeaderElectionListenerInterfaceAdapter(listener)); } @Override diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java index 0436e1eb0..230fc2af1 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java @@ -24,9 +24,16 @@ package org.apache.helix.metaclient.recipes.leaderelection; * leader node is deleted. */ public interface LeaderElectionListenerInterface { + enum ChangeType { + LEADER_ACQUIRED, + LEADER_LOST + } + // When new leader is elected: - // noLeader (null) -> has leader (new leader name) - // When existing leader not leader anymore: - // has Leader (prevleader name) -> no leader (null) - public void onLeadershipChange(String leaderPath, String prevLeader, String curLeader); + // ChangeType == NEW_LEADER_ELECTED, curLeader is the new leader name + // When no leader anymore: + // ChangeType == LEADER_GONE, curLeader is an empty string + // In ZK implementation, since notification does not include changed data and metaclient fetches + // the entry when event comes, it is possible that + public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader); } \ No newline at end of file diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java new file mode 100644 index 000000000..5c64d6790 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java @@ -0,0 +1,43 @@ +package org.apache.helix.metaclient.recipes.leaderelection; + +import org.apache.helix.metaclient.api.DataChangeListener; + +import static org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*; + + +public class LeaderElectionListenerInterfaceAdapter implements DataChangeListener { + private final LeaderElectionListenerInterface _leaderElectionListener; + + public LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface leaderElectionListener) { + _leaderElectionListener = leaderElectionListener; + } + + @Override + public void handleDataChange(String key, Object data, ChangeType changeType) throws Exception { + switch (changeType) { + case ENTRY_CREATED: + String newLeader = ((LeaderInfo) data).getLeaderName(); + _leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED, newLeader); + break; + case ENTRY_DELETED: + _leaderElectionListener.onLeadershipChange(key, LEADER_LOST, ""); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LeaderElectionListenerInterfaceAdapter that = (LeaderElectionListenerInterfaceAdapter) o; + return _leaderElectionListener.equals(that._leaderElectionListener); + } + + @Override + public int hashCode() { + return _leaderElectionListener.hashCode(); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java index 36b9b2131..c74b7d7ef 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java @@ -162,7 +162,8 @@ public class TestConnectStateChangeListenerAndRetry { zkMetaClient.create("/key", "value"); Assert.fail("Create call after close should throw IllegalStateException"); } catch (Exception ex) { - Assert.assertTrue(ex.getCause() instanceof IllegalStateException); + System.out.println("ex " + ex); + Assert.assertTrue(ex instanceof IllegalStateException); } } System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis())); diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index 412ecaf8b..b0b396c1a 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -2,6 +2,8 @@ package org.apache.helix.metaclient.recipes.leaderelection; import java.util.ConcurrentModificationException; import org.apache.helix.metaclient.MetaClientTestUtil; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.helix.metaclient.factories.MetaClientConfig; import org.apache.helix.metaclient.impl.zk.ZkMetaClient; import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; @@ -26,12 +28,14 @@ public class TestLeaderElection extends ZkMetaClientTestBase { @Test public void testAcquireLeadership() throws Exception { + String leaderPath = LEADER_PATH + "testAcquireLeadership"; + // create 2 clients representing 2 participants LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); - clt1.joinLeaderElectionParticipantPool(LEADER_PATH); - clt2.joinLeaderElectionParticipantPool(LEADER_PATH); + clt1.joinLeaderElectionParticipantPool(leaderPath); + clt2.joinLeaderElectionParticipantPool(leaderPath); // First client joining the leader election group should be current leader Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(LEADER_PATH) != null); @@ -139,4 +143,131 @@ public class TestLeaderElection extends ZkMetaClientTestBase { Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); } + @Test (dependsOnMethods = "testAcquireLeadership") + public void testLeadershipListener() throws Exception { + String leaderPath = LEADER_PATH + "testLeadershipListener"; + // create 2 clients representing 2 participants + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2); + + final int count = 10; + final int[] numNewLeaderEvent = {0}; + final int[] numLeaderGoneEvent = {0}; + CountDownLatch countDownLatchNewLeader = new CountDownLatch(count*2); + CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count*2); + + LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { + + @Override + public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) { + if (type == ChangeType.LEADER_LOST) { + countDownLatchLeaderGone.countDown(); + Assert.assertEquals(curLeader.length(), 0); + numLeaderGoneEvent[0]++; + } else if (type == ChangeType.LEADER_ACQUIRED) { + countDownLatchNewLeader.countDown(); + numNewLeaderEvent[0]++; + Assert.assertTrue(curLeader.length()!=0); + } else { + Assert.fail(); + } + } + }; + + clt3.subscribeLeadershipChanges(leaderPath, listener); + + // each iteration will be participant_1 is new leader, leader gone, participant_2 is new leader, leader gone + for (int i=0; i<count; ++i) { + joinPoolTestHelper(leaderPath, clt1, clt2); + Thread.sleep(1000); + } + + Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION, TimeUnit.MILLISECONDS)); + Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION, TimeUnit.MILLISECONDS)); + Assert.assertEquals(numLeaderGoneEvent[0], count*2); + Assert.assertEquals(numNewLeaderEvent[0], count*2); + + clt3.unsubscribeLeadershipChanges(leaderPath, listener); + + // listener shouldn't receive any event after unsubscribe + joinPoolTestHelper(leaderPath, clt1, clt2); + Assert.assertEquals(numLeaderGoneEvent[0], count*2); + Assert.assertEquals(numNewLeaderEvent[0], count*2); + + clt1.close(); + clt2.close(); + clt3.close(); + } + + private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2) throws Exception { + clt1.joinLeaderElectionParticipantPool(leaderPath); + clt2.joinLeaderElectionParticipantPool(leaderPath); + + Thread.sleep(2000); + + // clt1 gone + clt1.exitLeaderElectionParticipantPool(leaderPath); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2)); + }, MetaClientTestUtil.WAIT_DURATION)); + + clt2.exitLeaderElectionParticipantPool(leaderPath); + + } + + @Test (dependsOnMethods = "testLeadershipListener") + public void testRelinquishLeadership() throws Exception { + String leaderPath = LEADER_PATH + "testRelinquishLeadership"; + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2); + + + final int count = 1; + CountDownLatch countDownLatchNewLeader = new CountDownLatch(count); + CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count); + + LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { + + @Override + public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) { + if (type == ChangeType.LEADER_LOST) { + countDownLatchLeaderGone.countDown(); + Assert.assertEquals(curLeader.length(), 0); + } else if (type == ChangeType.LEADER_ACQUIRED) { + countDownLatchNewLeader.countDown(); + Assert.assertTrue(curLeader.length()!=0); + } else { + Assert.fail(); + } + } + }; + + clt1.joinLeaderElectionParticipantPool(leaderPath); + clt2.joinLeaderElectionParticipantPool(leaderPath); + clt3.subscribeLeadershipChanges(leaderPath, listener); + // clt1 gone + clt1.relinquishLeader(leaderPath); + + // participant 1 should have gone, and a leader gone event is sent + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION, TimeUnit.MILLISECONDS)); + + clt1.exitLeaderElectionParticipantPool(leaderPath); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2)); + }, MetaClientTestUtil.WAIT_DURATION)); + + clt2.exitLeaderElectionParticipantPool(leaderPath); + } + } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java index 589425462..376409231 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java @@ -260,7 +260,6 @@ public class ZkConnection implements IZkConnection { private void lookupGetChildrenMethod() { _getChildrenMethod = doLookUpGetChildrenMethod(); - System.out.println(" ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED " + GETCHILDREN_PAGINATION_DISABLED); LOG.info("Pagination config {}={}, method to be invoked: {}", ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, GETCHILDREN_PAGINATION_DISABLED, _getChildrenMethod.getName());