This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 52fe73818d9be5b493888943da58d2e763c8a5be Author: xyuanlu <[email protected]> AuthorDate: Sat Jul 22 21:45:29 2023 -0700 Metaclient - Leader election - Track participants and add participant info (#2562) Co-authored-by: Xiaoyuan Lu <[email protected]> --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 2 - .../leaderelection/LeaderElectionClient.java | 132 ++++++++++++++++----- .../{TestUtil.java => MetaClientTestUtil.java} | 7 +- .../apache/helix/metaclient/impl/zk/TestUtil.java | 68 +++++++++++ .../recipes/leaderelection/TestLeaderElection.java | 104 ++++++++++++++-- 5 files changed, 266 insertions(+), 47 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 691f31cde..8753747f3 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -57,14 +57,12 @@ import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.exception.ZkException; -import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode; import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException; diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index d5bdb735a..39233e979 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -21,11 +21,14 @@ package org.apache.helix.metaclient.recipes.leaderelection; import java.util.Arrays; import java.util.ConcurrentModificationException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.helix.metaclient.api.ConnectStateChangeListener; import org.apache.helix.metaclient.api.DataChangeListener; import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.Op; @@ -34,6 +37,7 @@ import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.exception.MetaClientNoNodeException; import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.ZkMetaClient; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; import org.slf4j.Logger; @@ -68,8 +72,13 @@ public class LeaderElectionClient implements AutoCloseable { // A list of leader election group that this client joins. private Set<String> _leaderGroups = new HashSet<>(); + private Map<String, LeaderInfo> _participantInfos = new HashMap<>(); + private final static String LEADER_ENTRY_KEY = "/LEADER"; + private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS"; + private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/"; ReElectListener _reElectListener = new ReElectListener(); + ConnectStateListener _connectStateListener = new ConnectStateListener(); /** * Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient @@ -90,6 +99,7 @@ public class LeaderElectionClient implements AutoCloseable { metaClientConfig.getConnectionAddress()).setZkSerializer((new LeaderInfoSerializer())).build(); _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig); _metaClient.connect(); + _metaClient.subscribeStateChanges(_connectStateListener); } else { throw new MetaClientException("Unsupported store type: " + metaClientConfig.getStoreType()); } @@ -121,8 +131,8 @@ public class LeaderElectionClient implements AutoCloseable { * @throws RuntimeException if the operation is not succeeded. */ public void joinLeaderElectionParticipantPool(String leaderPath) { - // TODO: create participant entry subscribeAndTryCreateLeaderEntry(leaderPath); + createParticipantInfo(leaderPath, new LeaderInfo(_participant)); } /** @@ -134,8 +144,40 @@ public class LeaderElectionClient implements AutoCloseable { * @throws RuntimeException if the operation is not succeeded. */ public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo userInfo) { - // TODO: create participant entry with info subscribeAndTryCreateLeaderEntry(leaderPath); + + LeaderInfo participantInfo = new LeaderInfo(userInfo); + createParticipantInfo(leaderPath, participantInfo); + } + + private void createParticipantInfo(String leaderPath, LeaderInfo participantInfo) { + _participantInfos.put(leaderPath, participantInfo); + + createPathIfNotExists(leaderPath + PARTICIPANTS_ENTRY_KEY); + + try { + // try to create participant info entry, assuming leader election group node is already there + _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, participantInfo, + MetaClientInterface.EntryMode.EPHEMERAL); + } catch (MetaClientNodeExistsException ex) { + throw new ConcurrentModificationException("Already joined leader election group. ", ex); + } catch (MetaClientNoNodeException ex) { + // Leader group root entry or participant parent entry is gone after we checked or created. + // Meaning other client removed the group. Throw ConcurrentModificationException. + throw new ConcurrentModificationException( + "Other client trying to modify the leader election group at the same time, please retry.", ex); + } + } + + private void createPathIfNotExists(String path) { + if (_metaClient.exists(path) == null) { + LOG.info("{} Creating leader group directory {}.", _participant, path); + try { + _metaClient.create(path, null); + } catch (MetaClientNodeExistsException ignore) { + LOG.info("Leader election group root path already created: path {}.", path); + } + } } private void subscribeAndTryCreateLeaderEntry(String leaderPath) { @@ -144,30 +186,19 @@ public class LeaderElectionClient implements AutoCloseable { leaderInfo.setLeaderName(_participant); try { + createPathIfNotExists(leaderPath); + } catch (MetaClientNoNodeException e) { + // Parent entry missed in root path. + throw new MetaClientException("Parent entry in leaderGroup path" + leaderPath + " does not exist."); + } + + // create actual leader node + try { + LOG.info("{} joining leader group {}.", _participant, leaderPath); // try to create leader entry, assuming leader election group node is already there _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, MetaClientInterface.EntryMode.EPHEMERAL); } catch (MetaClientNodeExistsException ex) { - LOG.info("Already a leader for group {}", leaderPath); - } catch (MetaClientNoNodeException ex) { - try { - // try to create leader path root entry - _metaClient.create(leaderPath, null); - } catch (MetaClientNodeExistsException ignored) { - // root entry created by other client, ignore - } catch (MetaClientNoNodeException e) { - // Parent entry of user provided leader election group path missing. - // (e.g. `/a/b` not created in user specified leader election group path /a/b/c/LeaderGroup) - throw new MetaClientException("Parent entry in leaderGroup path" + leaderPath + " does not exist."); - } - try { - // try to create leader node again. - _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, MetaClientInterface.EntryMode.EPHEMERAL); - } catch (MetaClientNoNodeException e) { - // Leader group root entry is gone after we checked at outer catch block. - // Meaning other client removed the group. Throw ConcurrentModificationException. - throw new ConcurrentModificationException( - "Other client trying to modify the leader election group at the same time, please retry.", ex); - } + LOG.info("Already a leader in leader group {}.", leaderPath); } _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY); @@ -222,6 +253,7 @@ public class LeaderElectionClient implements AutoCloseable { // deleting ZNode. So that handler in ReElectListener won't recreate the leader node. if (exitLeaderElectionParticipantPool) { _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY); + _metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant); } // check if current participant is the leader // read data and stats, check, and multi check + delete @@ -250,13 +282,23 @@ public class LeaderElectionClient implements AutoCloseable { * @throws RuntimeException when leader path does not exist. // TODO: define exp type */ public String getLeader(String leaderPath) { + LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY); return leaderInfo == null ? null : leaderInfo.getLeaderName(); } - public LeaderInfo getParticipantInfo(String leaderPath) { - // TODO: add getParticipantInfo impl - return null; + /** + * Get current leader. + * + * @param leaderPath The path for leader election. + * @return Returns a LeaderInfo entry. Return null if participant is not in the pool. + * */ + public LeaderInfo getParticipantInfo(String leaderPath, String participant) { + return _metaClient.get(leaderPath + PARTICIPANTS_ENTRY_PARENT + participant); + } + + public MetaClientInterface.Stat getLeaderEntryStat(String leaderPath) { + return _metaClient.exists(leaderPath + LEADER_ENTRY_KEY); } /** @@ -271,7 +313,11 @@ public class LeaderElectionClient implements AutoCloseable { * @throws RuntimeException when leader path does not exist. // TODO: define exp type */ public List<String> getParticipants(String leaderPath) { - return null; + try { + return _metaClient.getDirectChildrenKeys(leaderPath + PARTICIPANTS_ENTRY_KEY); + } catch (MetaClientNoNodeException ex) { + throw new MetaClientException("No leader election group create for path " + leaderPath, ex); + } } /** @@ -302,6 +348,8 @@ public class LeaderElectionClient implements AutoCloseable { @Override public void close() throws Exception { + _metaClient.unsubscribeConnectStateChanges(_connectStateListener); + // exit all previous joined leader election groups for (String leaderGroup : _leaderGroups) { String leaderGroupPathName = @@ -318,19 +366,43 @@ public class LeaderElectionClient implements AutoCloseable { @Override public void handleDataChange(String key, Object data, ChangeType changeType) throws Exception { if (changeType == ChangeType.ENTRY_CREATED) { - LOG.info("new leader {} for leader election group {}.", ((LeaderInfo) data).getLeaderName(), key); + LOG.info("new leader for leader election group {}.", key); } else if (changeType == ChangeType.ENTRY_DELETED) { if (_leaderGroups.contains(key)) { LeaderInfo lf = new LeaderInfo("LEADER"); lf.setLeaderName(_participant); try { + LOG.info("Leader gone for group {}, {} try to reelect.", key, _participant); _metaClient.create(key, lf, MetaClientInterface.EntryMode.EPHEMERAL); } catch (MetaClientNodeExistsException ex) { - LOG.info("Already a leader {} for leader election group {}.", ((LeaderInfo) data).getLeaderName(), key); + LOG.info("Already a leader for leader election group {}.", key); } } } } } -} + class ConnectStateListener implements ConnectStateChangeListener { + + @Override + public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState, + MetaClientInterface.ConnectState currentState) throws Exception { + if (prevState == MetaClientInterface.ConnectState.EXPIRED + && currentState == MetaClientInterface.ConnectState.CONNECTED) { + for (String leaderPath : _participantInfos.keySet()) { + _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath), + MetaClientInterface.EntryMode.EPHEMERAL); + } + } + } + + @Override + public void handleConnectionEstablishmentError(Throwable error) throws Exception { + + } + } + + public MetaClientInterface getMetaClient() { + return _metaClient; + } +} \ No newline at end of file diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java similarity index 75% rename from meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java rename to meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java index 9b20c8e53..2c7543e31 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java @@ -1,7 +1,10 @@ package org.apache.helix.metaclient; -public class TestUtil { - public static final long WAIT_DURATION = 6 * 1000L; +import java.util.concurrent.TimeUnit; + + +public class MetaClientTestUtil { + public static final long WAIT_DURATION = TimeUnit.MINUTES.toMicros(1); public interface Verifier { boolean verify() throws Exception; diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java index 1296a72f3..fbf1ab1f3 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java @@ -25,11 +25,17 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.ZkClient; import org.apache.helix.zookeeper.zkclient.ZkConnection; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.testng.Assert; public class TestUtil { @@ -93,4 +99,66 @@ public class TestUtil { return lists; } + + public static void expireSession(ZkMetaClient client) + throws Exception { + final CountDownLatch waitNewSession = new CountDownLatch(1); + final ZkClient zkClient = client.getZkClient(); + + IZkStateListener listener = new IZkStateListener() { + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) + throws Exception { + } + + @Override + public void handleNewSession(final String sessionId) + throws Exception { + // make sure zkclient is connected again + zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS); + + waitNewSession.countDown(); + } + + @Override + public void handleSessionEstablishmentError(Throwable var1) + throws Exception { + } + }; + + zkClient.subscribeStateChanges(listener); + + ZkConnection connection = ((ZkConnection) zkClient.getConnection()); + ZooKeeper curZookeeper = connection.getZookeeper(); + String oldSessionId = Long.toHexString(curZookeeper.getSessionId()); + + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + } + }; + + final ZooKeeper dupZookeeper = + new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher, + curZookeeper.getSessionId(), curZookeeper.getSessionPasswd()); + // wait until connected, then close + while (dupZookeeper.getState() != ZooKeeper.States.CONNECTED) { + Thread.sleep(10); + } + Assert.assertEquals(dupZookeeper.getState(), ZooKeeper.States.CONNECTED, + "Fail to connect to zk using current session info"); + dupZookeeper.close(); + + // make sure session expiry really happens + waitNewSession.await(); + zkClient.unsubscribeStateChanges(listener); + + connection = (ZkConnection) zkClient.getConnection(); + curZookeeper = connection.getZookeeper(); + + String newSessionId = Long.toHexString(curZookeeper.getSessionId()); + Assert.assertFalse(newSessionId.equals(oldSessionId), + "Fail to expire current session, zk: " + curZookeeper); + } + } \ No newline at end of file diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index 3a45d97ce..412ecaf8b 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -1,11 +1,15 @@ package org.apache.helix.metaclient.recipes.leaderelection; -import org.apache.helix.metaclient.TestUtil; +import java.util.ConcurrentModificationException; +import org.apache.helix.metaclient.MetaClientTestUtil; import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.ZkMetaClient; import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; import org.testng.Assert; import org.testng.annotations.Test; +import static org.apache.helix.metaclient.impl.zk.TestUtil.*; + public class TestLeaderElection extends ZkMetaClientTestBase { @@ -15,8 +19,8 @@ public class TestLeaderElection extends ZkMetaClientTestBase { public LeaderElectionClient createLeaderElectionClient(String participantName) { MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER; - MetaClientConfig config = new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR) - .setStoreType(storeType).build(); + MetaClientConfig config = + new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build(); return new LeaderElectionClient(config, participantName); } @@ -29,31 +33,31 @@ public class TestLeaderElection extends ZkMetaClientTestBase { clt1.joinLeaderElectionParticipantPool(LEADER_PATH); clt2.joinLeaderElectionParticipantPool(LEADER_PATH); // First client joining the leader election group should be current leader - Assert.assertTrue(TestUtil.verify(() -> { + Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(LEADER_PATH) != null); - }, TestUtil.WAIT_DURATION)); + }, MetaClientTestUtil.WAIT_DURATION)); Assert.assertNotNull(clt1.getLeader(LEADER_PATH)); Assert.assertEquals(clt1.getLeader(LEADER_PATH), clt2.getLeader(LEADER_PATH)); Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1); // client 1 exit leader election group, and client 2 should be current leader. clt1.exitLeaderElectionParticipantPool(LEADER_PATH); - Assert.assertTrue(TestUtil.verify(() -> { + Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(LEADER_PATH) != null); - }, TestUtil.WAIT_DURATION)); - Assert.assertTrue(TestUtil.verify(() -> { + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2)); - }, TestUtil.WAIT_DURATION)); + }, MetaClientTestUtil.WAIT_DURATION)); // client1 join and client2 leave. client 1 should be leader. clt1.joinLeaderElectionParticipantPool(LEADER_PATH); clt2.exitLeaderElectionParticipantPool(LEADER_PATH); - Assert.assertTrue(TestUtil.verify(() -> { + Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(LEADER_PATH) != null); - }, TestUtil.WAIT_DURATION)); - Assert.assertTrue(TestUtil.verify(() -> { + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1)); - }, TestUtil.WAIT_DURATION)); + }, MetaClientTestUtil.WAIT_DURATION)); Assert.assertTrue(clt1.isLeader(LEADER_PATH)); Assert.assertFalse(clt2.isLeader(LEADER_PATH)); @@ -61,4 +65,78 @@ public class TestLeaderElection extends ZkMetaClientTestBase { clt2.close(); } + @Test + public void testElectionPoolMembership() throws Exception { + String leaderPath = LEADER_PATH + "/testElectionPoolMembership"; + LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); + participantInfo.setSimpleField("Key1", "value1"); + LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); + participantInfo2.setSimpleField("Key2", "value2"); + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + try { + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // no op + } catch (ConcurrentModificationException ex) { + // expected + Assert.assertEquals(ex.getClass().getName(), "java.util.ConcurrentModificationException"); + } + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath)); + Assert.assertNotNull(clt1.getLeader(leaderPath)); + Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + + // clt1 gone + clt1.relinquishLeader(leaderPath); + clt1.exitLeaderElectionParticipantPool(leaderPath); + clt2.exitLeaderElectionParticipantPool(leaderPath); + + Assert.assertNull(clt2.getParticipantInfo(LEADER_PATH, PARTICIPANT_NAME2)); + } + + @Test + public void testSessionExpire() throws Exception { + String leaderPath = LEADER_PATH + "/testSessionExpire"; + LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); + participantInfo.setSimpleField("Key1", "value1"); + LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); + participantInfo2.setSimpleField("Key2", "value2"); + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + try { + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // no op + } catch (ConcurrentModificationException ex) { + // expected + Assert.assertEquals(ex.getClass().getName(), "java.util.ConcurrentModificationException"); + } + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + // a leader should be up + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + + // session expire and reconnect + expireSession((ZkMetaClient) clt1.getMetaClient()); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath)); + Assert.assertNotNull(clt1.getLeader(leaderPath)); + // when session recreated, participant info node should maintain + Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt1.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + } }
