This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 49c29dc48 Fix race condition when reconnect in leader election client.
(#2814)
49c29dc48 is described below
commit 49c29dc4821b7671b752f701addd96ba85c19ab0
Author: xyuanlu <[email protected]>
AuthorDate: Tue Jun 25 22:36:51 2024 -0700
Fix race condition when reconnect in leader election client. (#2814)
Fix race condition when reconnect in leader election client.
---
.../leaderelection/LeaderElectionClient.java | 27 ++++++++++++------
.../apache/helix/metaclient/impl/zk/TestUtil.java | 18 ++++++++++++
.../recipes/leaderelection/TestLeaderElection.java | 32 ++++++++++++++++++++--
3 files changed, 66 insertions(+), 11 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index ae7d9c9fa..fc25289a8 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -38,7 +38,6 @@ import
org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.factories.MetaClientConfig;
-import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.slf4j.Logger;
@@ -167,6 +166,7 @@ public class LeaderElectionClient implements AutoCloseable {
// try to create participant info entry, assuming leader election group
node is already there
_metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant, participantInfo,
MetaClientInterface.EntryMode.EPHEMERAL);
+ LOG.info("Participant {} joined leader group {}.", _participant,
leaderPath);
} catch (MetaClientNodeExistsException ex) {
throw new ConcurrentModificationException("Already joined leader
election group. ", ex);
} catch (MetaClientNoNodeException ex) {
@@ -261,6 +261,7 @@ public class LeaderElectionClient implements AutoCloseable {
// deleting ZNode. So that handler in ReElectListener won't recreate the
leader node.
if (exitLeaderElectionParticipantPool) {
_leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+ LOG.info("Leaving leader election pool {}.", leaderPath);
_metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant);
}
// check if current participant is the leader
@@ -272,12 +273,13 @@ public class LeaderElectionClient implements
AutoCloseable {
List<Op> ops = Arrays.asList(Op.check(key, expectedVersion),
Op.delete(key, expectedVersion));
//Execute transactional support on operations
List<OpResult> opResults = _metaClient.transactionOP(ops);
+ LOG.info("Try relinquish leader {}.", leaderPath);
if (opResults.get(0).getType() == ERRORRESULT) {
if (isLeader(leaderPath)) {
// Participant re-elected as leader.
throw new ConcurrentModificationException("Concurrent operation,
please retry");
} else {
- LOG.info("Someone else is already leader");
+ LOG.info("Someone else is already leader when relinquishing
leadership for path {}.", leaderPath);
}
}
}
@@ -366,7 +368,7 @@ public class LeaderElectionClient implements AutoCloseable {
@Override
public void close() throws Exception {
-
+ LOG.info("Closing leader election client.");
_metaClient.unsubscribeConnectStateChanges(_connectStateListener);
// exit all previous joined leader election groups
@@ -406,14 +408,20 @@ public class LeaderElectionClient implements
AutoCloseable {
@Override
public void handleConnectStateChanged(MetaClientInterface.ConnectState
prevState,
MetaClientInterface.ConnectState currentState) throws Exception {
- if (prevState == MetaClientInterface.ConnectState.EXPIRED
- && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+ LOG.info("Connect state changed from {} to {}", prevState, currentState);
+ if (currentState == MetaClientInterface.ConnectState.CONNECTED) {
+ // when reconnected, we try to recreate the ephemeral node for
participant
for (String leaderPath : _participantInfos.keySet()) {
- _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant, _participantInfos.get(leaderPath),
- MetaClientInterface.EntryMode.EPHEMERAL);
+ try {
+ LOG.info("Recreate participant node for leaderPath {}.",
leaderPath);
+ _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant, _participantInfos.get(leaderPath),
+ MetaClientInterface.EntryMode.EPHEMERAL);
+ } catch (MetaClientNodeExistsException ex) {
+ // If reconnected before expire, the ephemeral node is still there.
+ LOG.info("Participant {} already in leader group {}.",
_participant, leaderPath);
+ }
}
- } else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED
- && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+ // touch leader node to renew session ID
touchLeaderNode();
}
}
@@ -431,6 +439,7 @@ public class LeaderElectionClient implements AutoCloseable {
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
try {
+ LOG.info("Try touch leader node for path {}", _leaderGroups);
_metaClient.set(key, tup.left, expectedVersion);
} catch (MetaClientNoNodeException ex) {
LOG.info("leaderPath {} gone when retouch leader node.", key);
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
index 067fe3eb5..92c0182b3 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -185,4 +185,22 @@ public class TestUtil {
zkClient.process(event);
}
+ /**
+ * Simulate a zk state change by calling {@link
ZkClient#process(WatchedEvent)} directly
+ * This need to be done in a separate thread to simulate ZkClient
eventThread.
+ */
+ public static void simulateZkStateClosedAndReconnect(ZkMetaClient client)
throws InterruptedException {
+ final ZkClient zkClient = client.getZkClient();
+ WatchedEvent event =
+ new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Closed,
+ null);
+ zkClient.process(event);
+
+ Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+ event = new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected,
+ null);
+ zkClient.process(event);
+ }
+
}
\ No newline at end of file
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 248643652..c7aa06eff 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -5,7 +5,6 @@ import org.apache.helix.metaclient.MetaClientTestUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.metaclient.factories.MetaClientConfig;
-import org.apache.helix.metaclient.impl.zk.TestUtil;
import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
@@ -41,6 +40,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
}
}
+
// Test that calling isLeader before client joins
LeaderElectionParticipantPool returns false and does not throw NPE
@Test
public void testIsLeaderBeforeJoiningParticipantPool() throws Exception {
@@ -299,7 +299,7 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
System.out.println("END TestLeaderElection.testSessionExpire");
}
- @Test(dependsOnMethods = "testSessionExpire")
+ @Test (dependsOnMethods = "testSessionExpire")
public void testClientDisconnectAndReconnectBeforeExpire() throws Exception {
System.out.println("START
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
String leaderPath = LEADER_PATH +
"/testClientDisconnectAndReconnectBeforeExpire";
@@ -354,6 +354,34 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
System.out.println("END
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
}
+ @Test(dependsOnMethods = "testClientDisconnectAndReconnectBeforeExpire")
+ public void testClientClosedAndReconnectAfterExpire() throws Exception {
+ System.out.println("START
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
+ String leaderPath = LEADER_PATH +
"/testClientClosedAndReconnectAfterExpire";
+ LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+ participantInfo.setSimpleField("Key1", "value1");
+ LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+ participantInfo2.setSimpleField("Key2", "value2");
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+ clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+
+ // session expire and reconnect
+ expireSession((ZkMetaClient) clt1.getMetaClient());
+ // clt1 closed and reconnected
+ simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());
+
+ // when session recreated, participant info node should maintain
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+
+ ((ZkMetaClient<?>) clt1.getMetaClient()).close();
+ clt2.close();
+ System.out.println("END
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
+ }
+
private void joinPoolTestHelper(String leaderPath, LeaderElectionClient
clt1, LeaderElectionClient clt2)
throws Exception {
clt1.joinLeaderElectionParticipantPool(leaderPath);