This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 29337efc8 Leader election client - Proactively send leader gone event
when disconnect from ZK (#2585)
29337efc8 is described below
commit 29337efc893eeab1cced5fe3786db20d5ce141b4
Author: xyuanlu <[email protected]>
AuthorDate: Tue Aug 15 17:15:27 2023 -0700
Leader election client - Proactively send leader gone event when disconnect
from ZK (#2585)
---
.../leaderelection/LeaderElectionClient.java | 64 ++++--
.../LeaderElectionListenerInterfaceAdapter.java | 29 ++-
.../zk/TestConnectStateChangeListenerAndRetry.java | 32 +--
.../metaclient/impl/zk/TestStressZkClient.java | 3 +-
.../apache/helix/metaclient/impl/zk/TestUtil.java | 24 ++
.../leaderelection/LeaderElectionPuppy.java | 5 +
.../recipes/leaderelection/TestLeaderElection.java | 247 ++++++++++++++-------
.../TestMultiClientLeaderElection.java | 3 +
8 files changed, 273 insertions(+), 134 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index 7b13778c0..3bcf09ceb 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -33,6 +33,7 @@ import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
@@ -83,7 +84,7 @@ public class LeaderElectionClient implements AutoCloseable {
/**
* Construct a LeaderElectionClient using a user passed in
leaderElectionConfig. It creates a MetaClient
* instance underneath.
- * When MetaClient is auto closed be cause of being disconnected and auto
retry connection timed out, A new
+ * When MetaClient is auto closed because of being disconnected and auto
retry connection timed out, A new
* MetaClient instance will be created and keeps retry connection.
*
* @param metaClientConfig The config used to create an metaclient.
@@ -257,20 +258,24 @@ public class LeaderElectionClient implements
AutoCloseable {
}
// check if current participant is the leader
// read data and stats, check, and multi check + delete
- ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup =
_metaClient.getDataAndStat(key);
- if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
- int expectedVersion = tup.right.getVersion();
- List<Op> ops = Arrays.asList(Op.check(key, expectedVersion),
Op.delete(key, expectedVersion));
- //Execute transactional support on operations
- List<OpResult> opResults = _metaClient.transactionOP(ops);
- if (opResults.get(0).getType() == ERRORRESULT) {
- if (isLeader(leaderPath)) {
- // Participant re-elected as leader.
- throw new ConcurrentModificationException("Concurrent operation,
please retry");
- } else {
- LOG.info("Someone else is already leader");
+ try {
+ ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup =
_metaClient.getDataAndStat(key);
+ if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+ int expectedVersion = tup.right.getVersion();
+ List<Op> ops = Arrays.asList(Op.check(key, expectedVersion),
Op.delete(key, expectedVersion));
+ //Execute transactional support on operations
+ List<OpResult> opResults = _metaClient.transactionOP(ops);
+ if (opResults.get(0).getType() == ERRORRESULT) {
+ if (isLeader(leaderPath)) {
+ // Participant re-elected as leader.
+ throw new ConcurrentModificationException("Concurrent operation,
please retry");
+ } else {
+ LOG.info("Someone else is already leader");
+ }
}
}
+ } catch (MetaClientNoNodeException ex) {
+ LOG.info("No Leader for participant pool {} when exit the pool",
leaderPath);
}
}
@@ -334,8 +339,10 @@ public class LeaderElectionClient implements AutoCloseable
{
* @return A boolean value indicating if registration is success.
*/
public boolean subscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
- _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new
LeaderElectionListenerInterfaceAdapter(listener),
- false);
+ LeaderElectionListenerInterfaceAdapter adapter = new
LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
+ _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
+ adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe
event when path is not there
+ _metaClient.subscribeStateChanges(adapter);
return false;
}
@@ -344,7 +351,10 @@ public class LeaderElectionClient implements AutoCloseable
{
* @param listener An implementation of LeaderElectionListenerInterface
*/
public void unsubscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
- _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new
LeaderElectionListenerInterfaceAdapter(listener));
+ LeaderElectionListenerInterfaceAdapter adapter = new
LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
+ _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
+ );
+ _metaClient.unsubscribeConnectStateChanges(adapter);
}
@Override
@@ -395,6 +405,9 @@ public class LeaderElectionClient implements AutoCloseable {
_metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant, _participantInfos.get(leaderPath),
MetaClientInterface.EntryMode.EPHEMERAL);
}
+ } else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED
+ && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+ touchLeaderNode();
}
}
@@ -404,6 +417,25 @@ public class LeaderElectionClient implements AutoCloseable
{
}
}
+ private void touchLeaderNode() {
+ for (String leaderPath : _leaderGroups) {
+ String key = leaderPath;
+ ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup =
_metaClient.getDataAndStat(key);
+ if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+ int expectedVersion = tup.right.getVersion();
+ try {
+ _metaClient.set(key, tup.left, expectedVersion);
+ } catch (MetaClientNoNodeException ex) {
+ LOG.info("leaderPath {} gone when retouch leader node.", key);
+ } catch (MetaClientBadVersionException e) {
+ LOG.info("New leader for leaderPath {} when retouch leader node.",
key);
+ } catch (MetaClientException ex) {
+ LOG.warn("Failed to touch {} when reconnected.", key, ex);
+ }
+ }
+ }
+ }
+
public MetaClientInterface getMetaClient() {
return _metaClient;
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
index 5c64d6790..ee790ac82 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
@@ -1,14 +1,18 @@
package org.apache.helix.metaclient.recipes.leaderelection;
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
import static
org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*;
-public class LeaderElectionListenerInterfaceAdapter implements
DataChangeListener {
+public class LeaderElectionListenerInterfaceAdapter implements
DataChangeListener, ConnectStateChangeListener {
+ private String _leaderPath;
private final LeaderElectionListenerInterface _leaderElectionListener;
- public
LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface
leaderElectionListener) {
+ public LeaderElectionListenerInterfaceAdapter(String leaderPath,
LeaderElectionListenerInterface leaderElectionListener) {
+ _leaderPath = leaderPath;
_leaderElectionListener = leaderElectionListener;
}
@@ -16,11 +20,12 @@ public class LeaderElectionListenerInterfaceAdapter
implements DataChangeListene
public void handleDataChange(String key, Object data, ChangeType changeType)
throws Exception {
switch (changeType) {
case ENTRY_CREATED:
+ case ENTRY_UPDATE:
String newLeader = ((LeaderInfo) data).getLeaderName();
- _leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED,
newLeader);
+ _leaderElectionListener.onLeadershipChange(_leaderPath,
LEADER_ACQUIRED, newLeader);
break;
case ENTRY_DELETED:
- _leaderElectionListener.onLeadershipChange(key, LEADER_LOST, "");
+ _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST,
"");
}
}
@@ -40,4 +45,20 @@ public class LeaderElectionListenerInterfaceAdapter
implements DataChangeListene
public int hashCode() {
return _leaderElectionListener.hashCode();
}
+
+ @Override
+ public void handleConnectStateChanged(MetaClientInterface.ConnectState
prevState,
+ MetaClientInterface.ConnectState currentState) throws Exception {
+ if (currentState == MetaClientInterface.ConnectState.DISCONNECTED) {
+ // when disconnected, notify leader lost even though the ephmeral node
is not gone until expire
+ // Leader election client will touch the node if reconnect before expire
+ _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, "");
+ }
+
+ }
+
+ @Override
+ public void handleConnectionEstablishmentError(Throwable error) throws
Exception {
+
+ }
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
index 086db51c7..f088140c6 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
@@ -48,32 +48,14 @@ import org.testng.annotations.Test;
import static
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
import static
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.impl.zk.TestUtil.*;
public class TestConnectStateChangeListenerAndRetry {
- protected static final String ZK_ADDR = "localhost:2181";
+ protected static final String ZK_ADDR = "localhost:2184";
protected static ZkServer _zkServer;
- private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
- private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
- private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
-
- /**
- * Simulate a zk state change by calling {@link
ZkClient#process(WatchedEvent)} directly
- * This need to be done in a separate thread to simulate ZkClient
eventThread.
- */
- private static void simulateZkStateReconnected(ZkClient zkClient) throws
InterruptedException {
- WatchedEvent event =
- new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Disconnected,
- null);
- zkClient.process(event);
-
- Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
-
- event = new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected,
- null);
- zkClient.process(event);
- }
+
@BeforeTest
public void prepare() {
@@ -82,11 +64,6 @@ public class TestConnectStateChangeListenerAndRetry {
_zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
}
- @AfterTest
- public void cleanUp() {
- System.out.println("END TestConnectStateChangeListenerAndRetry at " + new
Date(System.currentTimeMillis()));
- }
-
@Test
public void testConnectState() {
System.out.println("STARTING
TestConnectStateChangeListenerAndRetry.testConnectState at " + new
Date(System.currentTimeMillis()));
@@ -115,7 +92,7 @@ public class TestConnectStateChangeListenerAndRetry {
@Override
public void run() {
try {
- simulateZkStateReconnected(zkMetaClient.getZkClient());
+ simulateZkStateReconnected(zkMetaClient);
} catch (InterruptedException e) {
Assert.fail("Exception in simulateZkStateReconnected", e);
}
@@ -170,6 +147,7 @@ public class TestConnectStateChangeListenerAndRetry {
} catch (Exception ex) {
Assert.assertTrue(ex instanceof IllegalStateException);
}
+ zkMetaClient.unsubscribeConnectStateChanges(listener);
}
System.out.println("END
TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " +
new Date(System.currentTimeMillis()));
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
index 8a3f9c115..6f358f0e8 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
@@ -48,7 +48,8 @@ public class TestStressZkClient extends ZkMetaClientTestBase {
}
@AfterTest
- private void tearDown() {
+ @Override
+ public void cleanUp() {
this._zkMetaClient.close();
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
index fbf1ab1f3..067fe3eb5 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -40,6 +40,10 @@ import org.testng.Assert;
public class TestUtil {
+ public static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+ public static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+ public static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
static java.lang.reflect.Field getField(Class clazz, String fieldName)
throws NoSuchFieldException {
try {
@@ -161,4 +165,24 @@ public class TestUtil {
"Fail to expire current session, zk: " + curZookeeper);
}
+
+
+ /**
+ * Simulate a zk state change by calling {@link
ZkClient#process(WatchedEvent)} directly
+ * This need to be done in a separate thread to simulate ZkClient
eventThread.
+ */
+ public static void simulateZkStateReconnected(ZkMetaClient client) throws
InterruptedException {
+ final ZkClient zkClient = client.getZkClient();
+ WatchedEvent event =
+ new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Disconnected,
+ null);
+ zkClient.process(event);
+
+ Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+ event = new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected,
+ null);
+ zkClient.process(event);
+ }
+
}
\ No newline at end of file
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
index 5f1fdf631..3f123d3ac 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
@@ -82,6 +82,11 @@ public class LeaderElectionPuppy extends AbstractPuppy {
_leaderElectionClient.exitLeaderElectionParticipantPool(_leaderGroup);
} catch (MetaClientException ignore) {
// already leave the pool. OK to throw exception.
+ } finally {
+ try {
+ _leaderElectionClient.close();
+ } catch (Exception e) {
+ }
}
}
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 71d85fdb9..75917623c 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -5,9 +5,13 @@ import org.apache.helix.metaclient.MetaClientTestUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.TestUtil;
import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
+import org.testng.annotations.AfterTest;
import org.testng.annotations.Test;
import static org.apache.helix.metaclient.impl.zk.TestUtil.*;
@@ -19,13 +23,24 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
private static final String PARTICIPANT_NAME2 = "participant_2";
private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1";
- public static LeaderElectionClient createLeaderElectionClient(String
participantName) {
+ public static LeaderElectionClient createLeaderElectionClient(String
participantName) {
MetaClientConfig.StoreType storeType =
MetaClientConfig.StoreType.ZOOKEEPER;
MetaClientConfig config =
new
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build();
return new LeaderElectionClient(config, participantName);
}
+ @AfterTest
+ @Override
+ public void cleanUp() {
+ ZkMetaClientConfig config = new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+ .build();
+ try (ZkMetaClient<ZNRecord> client = new ZkMetaClient<>(config)) {
+ client.connect();
+ client.recursiveDelete(LEADER_PATH);
+ }
+ }
+
@Test
public void testAcquireLeadership() throws Exception {
System.out.println("START TestLeaderElection.testAcquireLeadership");
@@ -47,7 +62,6 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
Assert.assertEquals(clt1.getLeader(leaderPath),
clt2.getLeader(leaderPath));
Assert.assertEquals(clt1.getLeader(leaderPath), PARTICIPANT_NAME1);
-
// client 1 exit leader election group, and client 2 should be current
leader.
clt1.exitLeaderElectionParticipantPool(leaderPath);
@@ -75,7 +89,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
System.out.println("END TestLeaderElection.testAcquireLeadership");
}
- @Test
+ @Test(dependsOnMethods = "testAcquireLeadership")
public void testElectionPoolMembership() throws Exception {
System.out.println("START TestLeaderElection.testElectionPoolMembership");
String leaderPath = LEADER_PATH + "/_testElectionPoolMembership";
@@ -111,49 +125,12 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
clt2.exitLeaderElectionParticipantPool(leaderPath);
Assert.assertNull(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2));
+ clt1.close();
+ clt2.close();
System.out.println("END TestLeaderElection.testElectionPoolMembership");
}
- @Test
- public void testSessionExpire() throws Exception {
- System.out.println("START TestLeaderElection.testSessionExpire");
- String leaderPath = LEADER_PATH + "/_testSessionExpire";
- LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
- participantInfo.setSimpleField("Key1", "value1");
- LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
- participantInfo2.setSimpleField("Key2", "value2");
- LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
- LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
-
- clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
- try {
- clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); //
no op
- } catch (ConcurrentModificationException ex) {
- // expected
- Assert.assertEquals(ex.getClass().getName(),
"java.util.ConcurrentModificationException");
- }
- clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
- // a leader should be up
- Assert.assertTrue(MetaClientTestUtil.verify(() -> {
- return (clt1.getLeader(leaderPath) != null);
- }, MetaClientTestUtil.WAIT_DURATION));
-
- // session expire and reconnect
- expireSession((ZkMetaClient) clt1.getMetaClient());
-
- Assert.assertTrue(MetaClientTestUtil.verify(() -> {
- return (clt1.getLeader(leaderPath) != null);
- }, MetaClientTestUtil.WAIT_DURATION));
- Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
- Assert.assertNotNull(clt1.getLeader(leaderPath));
- // when session recreated, participant info node should maintain
- Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
- Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
- Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
- Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
- System.out.println("END TestLeaderElection.testSessionExpire");
- }
- @Test (dependsOnMethods = "testAcquireLeadership")
+ @Test(dependsOnMethods = "testElectionPoolMembership")
public void testLeadershipListener() throws Exception {
System.out.println("START TestLeaderElection.testLeadershipListener");
String leaderPath = LEADER_PATH + "/testLeadershipListener";
@@ -165,45 +142,45 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
final int count = 10;
final int[] numNewLeaderEvent = {0};
final int[] numLeaderGoneEvent = {0};
- CountDownLatch countDownLatchNewLeader = new CountDownLatch(count*2);
- CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count*2);
-
- LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
-
- @Override
- public void onLeadershipChange(String leaderPath, ChangeType type,
String curLeader) {
- if (type == ChangeType.LEADER_LOST) {
- countDownLatchLeaderGone.countDown();
- Assert.assertEquals(curLeader.length(), 0);
- numLeaderGoneEvent[0]++;
- } else if (type == ChangeType.LEADER_ACQUIRED) {
- countDownLatchNewLeader.countDown();
- numNewLeaderEvent[0]++;
- Assert.assertTrue(curLeader.length()!=0);
- } else {
- Assert.fail();
- }
- }
- };
+ CountDownLatch countDownLatchNewLeader = new CountDownLatch(count * 2);
+ CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count * 2);
+
+ LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
+
+ @Override
+ public void onLeadershipChange(String leaderPath, ChangeType type,
String curLeader) {
+ if (type == ChangeType.LEADER_LOST) {
+ countDownLatchLeaderGone.countDown();
+ Assert.assertEquals(curLeader.length(), 0);
+ numLeaderGoneEvent[0]++;
+ } else if (type == ChangeType.LEADER_ACQUIRED) {
+ countDownLatchNewLeader.countDown();
+ numNewLeaderEvent[0]++;
+ Assert.assertTrue(curLeader.length() != 0);
+ } else {
+ Assert.fail();
+ }
+ }
+ };
clt3.subscribeLeadershipChanges(leaderPath, listener);
// each iteration will be participant_1 is new leader, leader gone,
participant_2 is new leader, leader gone
- for (int i=0; i<count; ++i) {
+ for (int i = 0; i < count; ++i) {
joinPoolTestHelper(leaderPath, clt1, clt2);
Thread.sleep(1000);
}
Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
- Assert.assertEquals(numLeaderGoneEvent[0], count*2);
- Assert.assertEquals(numNewLeaderEvent[0], count*2);
+ Assert.assertEquals(numLeaderGoneEvent[0], count * 2);
+ Assert.assertEquals(numNewLeaderEvent[0], count * 2);
clt3.unsubscribeLeadershipChanges(leaderPath, listener);
// listener shouldn't receive any event after unsubscribe
joinPoolTestHelper(leaderPath, clt1, clt2);
- Assert.assertEquals(numLeaderGoneEvent[0], count*2);
- Assert.assertEquals(numNewLeaderEvent[0], count*2);
+ Assert.assertEquals(numLeaderGoneEvent[0], count * 2);
+ Assert.assertEquals(numNewLeaderEvent[0], count * 2);
clt1.close();
clt2.close();
@@ -211,13 +188,46 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
System.out.println("END TestLeaderElection.testLeadershipListener");
}
- private void joinPoolTestHelper(String leaderPath, LeaderElectionClient
clt1, LeaderElectionClient clt2) throws Exception {
+ @Test(dependsOnMethods = "testLeadershipListener")
+ public void testRelinquishLeadership() throws Exception {
+ System.out.println("START TestLeaderElection.testRelinquishLeadership");
+ String leaderPath = LEADER_PATH + "/testRelinquishLeadership";
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+ LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ final int count = 1;
+ CountDownLatch countDownLatchNewLeader = new CountDownLatch(count);
+ CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count);
+
+ LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
+
+ @Override
+ public void onLeadershipChange(String leaderPath, ChangeType type,
String curLeader) {
+ if (type == ChangeType.LEADER_LOST) {
+ countDownLatchLeaderGone.countDown();
+ Assert.assertEquals(curLeader.length(), 0);
+ } else if (type == ChangeType.LEADER_ACQUIRED) {
+ countDownLatchNewLeader.countDown();
+ Assert.assertTrue(curLeader.length() != 0);
+ } else {
+ Assert.fail();
+ }
+ }
+ };
+
clt1.joinLeaderElectionParticipantPool(leaderPath);
clt2.joinLeaderElectionParticipantPool(leaderPath);
+ clt3.subscribeLeadershipChanges(leaderPath, listener);
+ // clt1 gone
+ clt1.relinquishLeader(leaderPath);
- Thread.sleep(2000);
+ // participant 1 should have gone, and a leader gone event is sent
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
- // clt1 gone
clt1.exitLeaderElectionParticipantPool(leaderPath);
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(leaderPath) != null);
@@ -227,20 +237,63 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
}, MetaClientTestUtil.WAIT_DURATION));
clt2.exitLeaderElectionParticipantPool(leaderPath);
-
+ clt1.close();
+ clt2.close();
+ clt3.close();
+ System.out.println("END TestLeaderElection.testRelinquishLeadership");
}
- @Test (dependsOnMethods = "testLeadershipListener")
- public void testRelinquishLeadership() throws Exception {
- System.out.println("START TestLeaderElection.testRelinquishLeadership");
- String leaderPath = LEADER_PATH + "/testRelinquishLeadership";
+ @Test(dependsOnMethods = "testAcquireLeadership")
+ public void testSessionExpire() throws Exception {
+ System.out.println("START TestLeaderElection.testSessionExpire");
+ String leaderPath = LEADER_PATH + "/_testSessionExpire";
+ LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+ participantInfo.setSimpleField("Key1", "value1");
+ LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+ participantInfo2.setSimpleField("Key2", "value2");
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
- LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+ try {
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); //
no op
+ } catch (ConcurrentModificationException ex) {
+ // expected
+ Assert.assertEquals(ex.getClass().getName(),
"java.util.ConcurrentModificationException");
+ }
+ clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+ // a leader should be up
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+ // session expire and reconnect
+ expireSession((ZkMetaClient) clt1.getMetaClient());
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
+ Assert.assertNotNull(clt1.getLeader(leaderPath));
+ // when session recreated, participant info node should maintain
+ Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+ clt1.close();
+ clt2.close();
+ System.out.println("END TestLeaderElection.testSessionExpire");
+ }
+
+ @Test(dependsOnMethods = "testSessionExpire")
+ public void testClientDisconnectAndReconnectBeforeExpire() throws Exception {
+ System.out.println("START
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
+ String leaderPath = LEADER_PATH +
"/testClientDisconnectAndReconnectBeforeExpire";
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
final int count = 1;
- CountDownLatch countDownLatchNewLeader = new CountDownLatch(count);
+ CountDownLatch countDownLatchNewLeader = new CountDownLatch(count + 1);
CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count);
LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
@@ -250,27 +303,51 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
if (type == ChangeType.LEADER_LOST) {
countDownLatchLeaderGone.countDown();
Assert.assertEquals(curLeader.length(), 0);
+ System.out.println("gone leader");
} else if (type == ChangeType.LEADER_ACQUIRED) {
countDownLatchNewLeader.countDown();
- Assert.assertTrue(curLeader.length()!=0);
+ Assert.assertTrue(curLeader.length() != 0);
+ System.out.println("new leader");
} else {
Assert.fail();
}
}
};
+ clt1.subscribeLeadershipChanges(leaderPath, listener);
clt1.joinLeaderElectionParticipantPool(leaderPath);
clt2.joinLeaderElectionParticipantPool(leaderPath);
- clt3.subscribeLeadershipChanges(leaderPath, listener);
- // clt1 gone
- clt1.relinquishLeader(leaderPath);
-
- // participant 1 should have gone, and a leader gone event is sent
+ // check leader node version before we simulate disconnect.
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));
+ int leaderNodeVersion = ((ZkMetaClient)
clt1.getMetaClient()).exists(leaderPath + "/LEADER").getVersion();
+ System.out.println("version " + leaderNodeVersion);
+
+ // clt1 disconnected and reconnected before session expire
+ simulateZkStateReconnected((ZkMetaClient) clt1.getMetaClient());
+
+
Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
+ leaderNodeVersion = ((ZkMetaClient)
clt2.getMetaClient()).exists(leaderPath + "/LEADER").getVersion();
+ System.out.println("version " + leaderNodeVersion);
+
+ clt1.exitLeaderElectionParticipantPool(leaderPath);
+ clt2.exitLeaderElectionParticipantPool(leaderPath);
+ clt1.close();
+ clt2.close();
+ System.out.println("END
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
+ }
+
+ private void joinPoolTestHelper(String leaderPath, LeaderElectionClient
clt1, LeaderElectionClient clt2)
+ throws Exception {
+ clt1.joinLeaderElectionParticipantPool(leaderPath);
+ clt2.joinLeaderElectionParticipantPool(leaderPath);
+
+ Thread.sleep(2000);
+
+ // clt1 gone
clt1.exitLeaderElectionParticipantPool(leaderPath);
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(leaderPath) != null);
@@ -280,7 +357,5 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
}, MetaClientTestUtil.WAIT_DURATION));
clt2.exitLeaderElectionParticipantPool(leaderPath);
- System.out.println("END TestLeaderElection.testRelinquishLeadership");
}
-
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
index 0fe424501..4fb508790 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
@@ -51,11 +51,14 @@ public class TestMultiClientLeaderElection extends
ZkMetaClientTestBase {
_zkMetaClient.create("/Parent/a", "");
}
@AfterTest
+ @Override
public void cleanUp() {
try {
_zkMetaClient.recursiveDelete(_leaderElectionGroup);
} catch (MetaClientException ex) {
_zkMetaClient.recursiveDelete(_leaderElectionGroup);
+ } finally {
+ _zkMetaClient.close();
}
}