This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 29337efc8 Leader election client - Proactively send leader gone event 
when disconnect from ZK  (#2585)
29337efc8 is described below

commit 29337efc893eeab1cced5fe3786db20d5ce141b4
Author: xyuanlu <[email protected]>
AuthorDate: Tue Aug 15 17:15:27 2023 -0700

    Leader election client - Proactively send leader gone event when disconnect 
from ZK  (#2585)
---
 .../leaderelection/LeaderElectionClient.java       |  64 ++++--
 .../LeaderElectionListenerInterfaceAdapter.java    |  29 ++-
 .../zk/TestConnectStateChangeListenerAndRetry.java |  32 +--
 .../metaclient/impl/zk/TestStressZkClient.java     |   3 +-
 .../apache/helix/metaclient/impl/zk/TestUtil.java  |  24 ++
 .../leaderelection/LeaderElectionPuppy.java        |   5 +
 .../recipes/leaderelection/TestLeaderElection.java | 247 ++++++++++++++-------
 .../TestMultiClientLeaderElection.java             |   3 +
 8 files changed, 273 insertions(+), 134 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index 7b13778c0..3bcf09ceb 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -33,6 +33,7 @@ import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.exception.MetaClientBadVersionException;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
 import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
@@ -83,7 +84,7 @@ public class LeaderElectionClient implements AutoCloseable {
   /**
    * Construct a LeaderElectionClient using a user passed in 
leaderElectionConfig. It creates a MetaClient
    * instance underneath.
-   * When MetaClient is auto closed be cause of being disconnected and auto 
retry connection timed out, A new
+   * When MetaClient is auto closed because of being disconnected and auto 
retry connection timed out, A new
    * MetaClient instance will be created and keeps retry connection.
    *
    * @param metaClientConfig The config used to create an metaclient.
@@ -257,20 +258,24 @@ public class LeaderElectionClient implements 
AutoCloseable {
     }
     // check if current participant is the leader
     // read data and stats, check, and multi check + delete
-    ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = 
_metaClient.getDataAndStat(key);
-    if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
-      int expectedVersion = tup.right.getVersion();
-      List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), 
Op.delete(key, expectedVersion));
-      //Execute transactional support on operations
-      List<OpResult> opResults = _metaClient.transactionOP(ops);
-      if (opResults.get(0).getType() == ERRORRESULT) {
-        if (isLeader(leaderPath)) {
-          // Participant re-elected as leader.
-          throw new ConcurrentModificationException("Concurrent operation, 
please retry");
-        } else {
-          LOG.info("Someone else is already leader");
+    try {
+      ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = 
_metaClient.getDataAndStat(key);
+      if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+        int expectedVersion = tup.right.getVersion();
+        List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), 
Op.delete(key, expectedVersion));
+        //Execute transactional support on operations
+        List<OpResult> opResults = _metaClient.transactionOP(ops);
+        if (opResults.get(0).getType() == ERRORRESULT) {
+          if (isLeader(leaderPath)) {
+            // Participant re-elected as leader.
+            throw new ConcurrentModificationException("Concurrent operation, 
please retry");
+          } else {
+            LOG.info("Someone else is already leader");
+          }
         }
       }
+    } catch (MetaClientNoNodeException ex) {
+      LOG.info("No Leader for participant pool {} when exit the pool", 
leaderPath);
     }
   }
 
@@ -334,8 +339,10 @@ public class LeaderElectionClient implements AutoCloseable 
{
    * @return A boolean value indicating if registration is success.
    */
   public boolean subscribeLeadershipChanges(String leaderPath, 
LeaderElectionListenerInterface listener) {
-    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new 
LeaderElectionListenerInterfaceAdapter(listener),
-        false);
+    LeaderElectionListenerInterfaceAdapter adapter = new 
LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
+    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
+        adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe 
event when path is not there
+    _metaClient.subscribeStateChanges(adapter);
     return false;
   }
 
@@ -344,7 +351,10 @@ public class LeaderElectionClient implements AutoCloseable 
{
    * @param listener An implementation of LeaderElectionListenerInterface
    */
   public void unsubscribeLeadershipChanges(String leaderPath, 
LeaderElectionListenerInterface listener) {
-    _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new 
LeaderElectionListenerInterfaceAdapter(listener));
+    LeaderElectionListenerInterfaceAdapter adapter = new 
LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
+    _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
+        );
+    _metaClient.unsubscribeConnectStateChanges(adapter);
   }
 
   @Override
@@ -395,6 +405,9 @@ public class LeaderElectionClient implements AutoCloseable {
           _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant, _participantInfos.get(leaderPath),
               MetaClientInterface.EntryMode.EPHEMERAL);
         }
+      } else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED
+          && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+        touchLeaderNode();
       }
     }
 
@@ -404,6 +417,25 @@ public class LeaderElectionClient implements AutoCloseable 
{
     }
   }
 
+  private void touchLeaderNode() {
+    for (String leaderPath : _leaderGroups) {
+      String key = leaderPath;
+      ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = 
_metaClient.getDataAndStat(key);
+      if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+        int expectedVersion = tup.right.getVersion();
+        try {
+          _metaClient.set(key, tup.left, expectedVersion);
+        } catch (MetaClientNoNodeException ex) {
+          LOG.info("leaderPath {} gone when retouch leader node.", key);
+        } catch (MetaClientBadVersionException e) {
+          LOG.info("New leader for leaderPath {} when retouch leader node.", 
key);
+        } catch (MetaClientException ex) {
+          LOG.warn("Failed to touch {} when reconnected.", key, ex);
+        }
+      }
+    }
+  }
+
   public MetaClientInterface getMetaClient() {
     return _metaClient;
   }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
index 5c64d6790..ee790ac82 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
@@ -1,14 +1,18 @@
 package org.apache.helix.metaclient.recipes.leaderelection;
 
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
 import org.apache.helix.metaclient.api.DataChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
 
 import static 
org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*;
 
 
-public class LeaderElectionListenerInterfaceAdapter implements 
DataChangeListener {
+public class LeaderElectionListenerInterfaceAdapter implements 
DataChangeListener, ConnectStateChangeListener {
+  private String _leaderPath;
   private final LeaderElectionListenerInterface _leaderElectionListener;
 
-  public 
LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface 
leaderElectionListener) {
+  public LeaderElectionListenerInterfaceAdapter(String leaderPath, 
LeaderElectionListenerInterface leaderElectionListener) {
+    _leaderPath = leaderPath;
     _leaderElectionListener = leaderElectionListener;
   }
 
@@ -16,11 +20,12 @@ public class LeaderElectionListenerInterfaceAdapter 
implements DataChangeListene
   public void handleDataChange(String key, Object data, ChangeType changeType) 
throws Exception {
     switch (changeType) {
       case  ENTRY_CREATED:
+      case ENTRY_UPDATE:
         String newLeader = ((LeaderInfo) data).getLeaderName();
-        _leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED, 
newLeader);
+        _leaderElectionListener.onLeadershipChange(_leaderPath, 
LEADER_ACQUIRED, newLeader);
         break;
       case ENTRY_DELETED:
-        _leaderElectionListener.onLeadershipChange(key, LEADER_LOST, "");
+        _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, 
"");
     }
   }
 
@@ -40,4 +45,20 @@ public class LeaderElectionListenerInterfaceAdapter 
implements DataChangeListene
   public int hashCode() {
     return _leaderElectionListener.hashCode();
   }
+
+  @Override
+  public void handleConnectStateChanged(MetaClientInterface.ConnectState 
prevState,
+      MetaClientInterface.ConnectState currentState) throws Exception {
+    if (currentState == MetaClientInterface.ConnectState.DISCONNECTED) {
+      // when disconnected, notify leader lost even though the ephmeral node 
is not gone until expire
+      // Leader election client will touch the node if reconnect before expire
+      _leaderElectionListener.onLeadershipChange(_leaderPath, LEADER_LOST, "");
+    }
+
+  }
+
+  @Override
+  public void handleConnectionEstablishmentError(Throwable error) throws 
Exception {
+
+  }
 }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
index 086db51c7..f088140c6 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
@@ -48,32 +48,14 @@ import org.testng.annotations.Test;
 
 import static 
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
 import static 
org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.impl.zk.TestUtil.*;
 
 
 public class TestConnectStateChangeListenerAndRetry  {
-  protected static final String ZK_ADDR = "localhost:2181";
+  protected static final String ZK_ADDR = "localhost:2184";
   protected static ZkServer _zkServer;
 
-  private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
-  private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
-  private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
-
-  /**
-   * Simulate a zk state change by calling {@link 
ZkClient#process(WatchedEvent)} directly
-   * This need to be done in a separate thread to simulate ZkClient 
eventThread.
-   */
-  private static void simulateZkStateReconnected(ZkClient zkClient) throws 
InterruptedException {
-      WatchedEvent event =
-          new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Disconnected,
-              null);
-      zkClient.process(event);
-
-      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
-
-      event = new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.SyncConnected,
-          null);
-      zkClient.process(event);
-  }
+
 
   @BeforeTest
   public void prepare() {
@@ -82,11 +64,6 @@ public class TestConnectStateChangeListenerAndRetry  {
     _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
   }
 
-  @AfterTest
-  public void cleanUp() {
-    System.out.println("END TestConnectStateChangeListenerAndRetry at " + new 
Date(System.currentTimeMillis()));
-  }
-
   @Test
   public void testConnectState() {
     System.out.println("STARTING 
TestConnectStateChangeListenerAndRetry.testConnectState at " + new 
Date(System.currentTimeMillis()));
@@ -115,7 +92,7 @@ public class TestConnectStateChangeListenerAndRetry  {
         @Override
         public void run() {
           try {
-            simulateZkStateReconnected(zkMetaClient.getZkClient());
+            simulateZkStateReconnected(zkMetaClient);
           } catch (InterruptedException e) {
            Assert.fail("Exception in simulateZkStateReconnected", e);
           }
@@ -170,6 +147,7 @@ public class TestConnectStateChangeListenerAndRetry  {
       } catch (Exception ex) {
         Assert.assertTrue(ex instanceof IllegalStateException);
       }
+      zkMetaClient.unsubscribeConnectStateChanges(listener);
     }
     System.out.println("END 
TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + 
new Date(System.currentTimeMillis()));
   }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
index 8a3f9c115..6f358f0e8 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
@@ -48,7 +48,8 @@ public class TestStressZkClient extends ZkMetaClientTestBase {
   }
 
   @AfterTest
-  private void tearDown() {
+  @Override
+  public void cleanUp() {
     this._zkMetaClient.close();
   }
 
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
index fbf1ab1f3..067fe3eb5 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -40,6 +40,10 @@ import org.testng.Assert;
 
 public class TestUtil {
 
+  public static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+  public static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+  public static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
   static java.lang.reflect.Field getField(Class clazz, String fieldName)
       throws NoSuchFieldException {
     try {
@@ -161,4 +165,24 @@ public class TestUtil {
         "Fail to expire current session, zk: " + curZookeeper);
   }
 
+
+
+  /**
+   * Simulate a zk state change by calling {@link 
ZkClient#process(WatchedEvent)} directly
+   * This need to be done in a separate thread to simulate ZkClient 
eventThread.
+   */
+  public static void simulateZkStateReconnected(ZkMetaClient client) throws 
InterruptedException {
+    final ZkClient zkClient = client.getZkClient();
+    WatchedEvent event =
+        new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Disconnected,
+            null);
+    zkClient.process(event);
+
+    Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+    event = new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.SyncConnected,
+        null);
+    zkClient.process(event);
+  }
+
 }
\ No newline at end of file
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
index 5f1fdf631..3f123d3ac 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionPuppy.java
@@ -82,6 +82,11 @@ public class LeaderElectionPuppy extends AbstractPuppy {
       _leaderElectionClient.exitLeaderElectionParticipantPool(_leaderGroup);
     } catch (MetaClientException ignore) {
       // already leave the pool. OK to throw exception.
+    } finally {
+      try {
+        _leaderElectionClient.close();
+      } catch (Exception e) {
+      }
     }
   }
 }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 71d85fdb9..75917623c 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -5,9 +5,13 @@ import org.apache.helix.metaclient.MetaClientTestUtil;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.TestUtil;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
+import org.testng.annotations.AfterTest;
 import org.testng.annotations.Test;
 
 import static org.apache.helix.metaclient.impl.zk.TestUtil.*;
@@ -19,13 +23,24 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
   private static final String PARTICIPANT_NAME2 = "participant_2";
   private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1";
 
-  public  static LeaderElectionClient createLeaderElectionClient(String 
participantName) {
+  public static LeaderElectionClient createLeaderElectionClient(String 
participantName) {
     MetaClientConfig.StoreType storeType = 
MetaClientConfig.StoreType.ZOOKEEPER;
     MetaClientConfig config =
         new 
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build();
     return new LeaderElectionClient(config, participantName);
   }
 
+  @AfterTest
+  @Override
+  public void cleanUp() {
+    ZkMetaClientConfig config = new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+        .build();
+    try (ZkMetaClient<ZNRecord> client = new ZkMetaClient<>(config)) {
+      client.connect();
+      client.recursiveDelete(LEADER_PATH);
+    }
+  }
+
   @Test
   public void testAcquireLeadership() throws Exception {
     System.out.println("START TestLeaderElection.testAcquireLeadership");
@@ -47,7 +62,6 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
     Assert.assertEquals(clt1.getLeader(leaderPath), 
clt2.getLeader(leaderPath));
     Assert.assertEquals(clt1.getLeader(leaderPath), PARTICIPANT_NAME1);
 
-
     // client 1 exit leader election group, and client 2 should be current 
leader.
     clt1.exitLeaderElectionParticipantPool(leaderPath);
 
@@ -75,7 +89,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
     System.out.println("END TestLeaderElection.testAcquireLeadership");
   }
 
-  @Test
+  @Test(dependsOnMethods = "testAcquireLeadership")
   public void testElectionPoolMembership() throws Exception {
     System.out.println("START TestLeaderElection.testElectionPoolMembership");
     String leaderPath = LEADER_PATH + "/_testElectionPoolMembership";
@@ -111,49 +125,12 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     clt2.exitLeaderElectionParticipantPool(leaderPath);
 
     Assert.assertNull(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2));
+    clt1.close();
+    clt2.close();
     System.out.println("END TestLeaderElection.testElectionPoolMembership");
   }
 
-  @Test
-  public void testSessionExpire() throws Exception {
-    System.out.println("START TestLeaderElection.testSessionExpire");
-    String leaderPath = LEADER_PATH + "/_testSessionExpire";
-    LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
-    participantInfo.setSimpleField("Key1", "value1");
-    LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
-    participantInfo2.setSimpleField("Key2", "value2");
-    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
-    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
-
-    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
-    try {
-      clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // 
no op
-    } catch (ConcurrentModificationException ex) {
-      // expected
-      Assert.assertEquals(ex.getClass().getName(), 
"java.util.ConcurrentModificationException");
-    }
-    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
-    // a leader should be up
-    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
-      return (clt1.getLeader(leaderPath) != null);
-    }, MetaClientTestUtil.WAIT_DURATION));
-
-    // session expire and reconnect
-    expireSession((ZkMetaClient) clt1.getMetaClient());
-
-    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
-      return (clt1.getLeader(leaderPath) != null);
-    }, MetaClientTestUtil.WAIT_DURATION));
-    Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
-    Assert.assertNotNull(clt1.getLeader(leaderPath));
-    // when session recreated, participant info node should maintain
-    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
-    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
-    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
-    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
-    System.out.println("END TestLeaderElection.testSessionExpire");
-  }
-  @Test (dependsOnMethods = "testAcquireLeadership")
+  @Test(dependsOnMethods = "testElectionPoolMembership")
   public void testLeadershipListener() throws Exception {
     System.out.println("START TestLeaderElection.testLeadershipListener");
     String leaderPath = LEADER_PATH + "/testLeadershipListener";
@@ -165,45 +142,45 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     final int count = 10;
     final int[] numNewLeaderEvent = {0};
     final int[] numLeaderGoneEvent = {0};
-    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count*2);
-    CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count*2);
-
-     LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
-
-       @Override
-       public void onLeadershipChange(String leaderPath, ChangeType type, 
String curLeader) {
-         if (type == ChangeType.LEADER_LOST) {
-           countDownLatchLeaderGone.countDown();
-           Assert.assertEquals(curLeader.length(), 0);
-           numLeaderGoneEvent[0]++;
-         } else if (type == ChangeType.LEADER_ACQUIRED) {
-           countDownLatchNewLeader.countDown();
-           numNewLeaderEvent[0]++;
-           Assert.assertTrue(curLeader.length()!=0);
-         } else {
-           Assert.fail();
-         }
-       }
-     };
+    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count * 2);
+    CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count * 2);
+
+    LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
+
+      @Override
+      public void onLeadershipChange(String leaderPath, ChangeType type, 
String curLeader) {
+        if (type == ChangeType.LEADER_LOST) {
+          countDownLatchLeaderGone.countDown();
+          Assert.assertEquals(curLeader.length(), 0);
+          numLeaderGoneEvent[0]++;
+        } else if (type == ChangeType.LEADER_ACQUIRED) {
+          countDownLatchNewLeader.countDown();
+          numNewLeaderEvent[0]++;
+          Assert.assertTrue(curLeader.length() != 0);
+        } else {
+          Assert.fail();
+        }
+      }
+    };
 
     clt3.subscribeLeadershipChanges(leaderPath, listener);
 
     // each iteration will be participant_1 is new leader, leader gone, 
participant_2 is new leader, leader gone
-    for (int i=0; i<count; ++i) {
+    for (int i = 0; i < count; ++i) {
       joinPoolTestHelper(leaderPath, clt1, clt2);
       Thread.sleep(1000);
     }
 
     
Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
     
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
-    Assert.assertEquals(numLeaderGoneEvent[0], count*2);
-    Assert.assertEquals(numNewLeaderEvent[0], count*2);
+    Assert.assertEquals(numLeaderGoneEvent[0], count * 2);
+    Assert.assertEquals(numNewLeaderEvent[0], count * 2);
 
     clt3.unsubscribeLeadershipChanges(leaderPath, listener);
     // listener shouldn't receive any event after unsubscribe
     joinPoolTestHelper(leaderPath, clt1, clt2);
-    Assert.assertEquals(numLeaderGoneEvent[0], count*2);
-    Assert.assertEquals(numNewLeaderEvent[0], count*2);
+    Assert.assertEquals(numLeaderGoneEvent[0], count * 2);
+    Assert.assertEquals(numNewLeaderEvent[0], count * 2);
 
     clt1.close();
     clt2.close();
@@ -211,13 +188,46 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     System.out.println("END TestLeaderElection.testLeadershipListener");
   }
 
-  private void joinPoolTestHelper(String leaderPath, LeaderElectionClient 
clt1, LeaderElectionClient clt2) throws Exception {
+  @Test(dependsOnMethods = "testLeadershipListener")
+  public void testRelinquishLeadership() throws Exception {
+    System.out.println("START TestLeaderElection.testRelinquishLeadership");
+    String leaderPath = LEADER_PATH + "/testRelinquishLeadership";
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+    LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    final int count = 1;
+    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count);
+    CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count);
+
+    LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
+
+      @Override
+      public void onLeadershipChange(String leaderPath, ChangeType type, 
String curLeader) {
+        if (type == ChangeType.LEADER_LOST) {
+          countDownLatchLeaderGone.countDown();
+          Assert.assertEquals(curLeader.length(), 0);
+        } else if (type == ChangeType.LEADER_ACQUIRED) {
+          countDownLatchNewLeader.countDown();
+          Assert.assertTrue(curLeader.length() != 0);
+        } else {
+          Assert.fail();
+        }
+      }
+    };
+
     clt1.joinLeaderElectionParticipantPool(leaderPath);
     clt2.joinLeaderElectionParticipantPool(leaderPath);
+    clt3.subscribeLeadershipChanges(leaderPath, listener);
+    // clt1 gone
+    clt1.relinquishLeader(leaderPath);
 
-    Thread.sleep(2000);
+    // participant 1 should have gone, and a leader gone event is sent
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
 
-    // clt1 gone
     clt1.exitLeaderElectionParticipantPool(leaderPath);
     Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(leaderPath) != null);
@@ -227,20 +237,63 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     }, MetaClientTestUtil.WAIT_DURATION));
 
     clt2.exitLeaderElectionParticipantPool(leaderPath);
-
+    clt1.close();
+    clt2.close();
+    clt3.close();
+    System.out.println("END TestLeaderElection.testRelinquishLeadership");
   }
 
-  @Test (dependsOnMethods = "testLeadershipListener")
-  public void testRelinquishLeadership() throws Exception {
-    System.out.println("START TestLeaderElection.testRelinquishLeadership");
-    String leaderPath = LEADER_PATH + "/testRelinquishLeadership";
+  @Test(dependsOnMethods = "testAcquireLeadership")
+  public void testSessionExpire() throws Exception {
+    System.out.println("START TestLeaderElection.testSessionExpire");
+    String leaderPath = LEADER_PATH + "/_testSessionExpire";
+    LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+    participantInfo.setSimpleField("Key1", "value1");
+    LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+    participantInfo2.setSimpleField("Key2", "value2");
     LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
     LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
-    LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
 
+    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+    try {
+      clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // 
no op
+    } catch (ConcurrentModificationException ex) {
+      // expected
+      Assert.assertEquals(ex.getClass().getName(), 
"java.util.ConcurrentModificationException");
+    }
+    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+    // a leader should be up
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+    // session expire and reconnect
+    expireSession((ZkMetaClient) clt1.getMetaClient());
+
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
+    Assert.assertNotNull(clt1.getLeader(leaderPath));
+    // when session recreated, participant info node should maintain
+    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+    clt1.close();
+    clt2.close();
+    System.out.println("END TestLeaderElection.testSessionExpire");
+  }
+
+  @Test(dependsOnMethods = "testSessionExpire")
+  public void testClientDisconnectAndReconnectBeforeExpire() throws Exception {
+    System.out.println("START 
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
+    String leaderPath = LEADER_PATH + 
"/testClientDisconnectAndReconnectBeforeExpire";
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
 
     final int count = 1;
-    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count);
+    CountDownLatch countDownLatchNewLeader = new CountDownLatch(count + 1);
     CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count);
 
     LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
@@ -250,27 +303,51 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
         if (type == ChangeType.LEADER_LOST) {
           countDownLatchLeaderGone.countDown();
           Assert.assertEquals(curLeader.length(), 0);
+          System.out.println("gone leader");
         } else if (type == ChangeType.LEADER_ACQUIRED) {
           countDownLatchNewLeader.countDown();
-          Assert.assertTrue(curLeader.length()!=0);
+          Assert.assertTrue(curLeader.length() != 0);
+          System.out.println("new  leader");
         } else {
           Assert.fail();
         }
       }
     };
 
+    clt1.subscribeLeadershipChanges(leaderPath, listener);
     clt1.joinLeaderElectionParticipantPool(leaderPath);
     clt2.joinLeaderElectionParticipantPool(leaderPath);
-    clt3.subscribeLeadershipChanges(leaderPath, listener);
-    // clt1 gone
-    clt1.relinquishLeader(leaderPath);
-
-    // participant 1 should have gone, and a leader gone event is sent
+    // check leader node version before we simulate disconnect.
     Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(leaderPath) != null);
     }, MetaClientTestUtil.WAIT_DURATION));
+    int leaderNodeVersion = ((ZkMetaClient) 
clt1.getMetaClient()).exists(leaderPath + "/LEADER").getVersion();
+    System.out.println("version " + leaderNodeVersion);
+
+    // clt1 disconnected and reconnected before session expire
+    simulateZkStateReconnected((ZkMetaClient) clt1.getMetaClient());
+
+    
Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
     
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
 TimeUnit.MILLISECONDS));
 
+    leaderNodeVersion = ((ZkMetaClient) 
clt2.getMetaClient()).exists(leaderPath + "/LEADER").getVersion();
+    System.out.println("version " + leaderNodeVersion);
+
+    clt1.exitLeaderElectionParticipantPool(leaderPath);
+    clt2.exitLeaderElectionParticipantPool(leaderPath);
+    clt1.close();
+    clt2.close();
+    System.out.println("END 
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
+  }
+
+  private void joinPoolTestHelper(String leaderPath, LeaderElectionClient 
clt1, LeaderElectionClient clt2)
+      throws Exception {
+    clt1.joinLeaderElectionParticipantPool(leaderPath);
+    clt2.joinLeaderElectionParticipantPool(leaderPath);
+
+    Thread.sleep(2000);
+
+    // clt1 gone
     clt1.exitLeaderElectionParticipantPool(leaderPath);
     Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(leaderPath) != null);
@@ -280,7 +357,5 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     }, MetaClientTestUtil.WAIT_DURATION));
 
     clt2.exitLeaderElectionParticipantPool(leaderPath);
-    System.out.println("END TestLeaderElection.testRelinquishLeadership");
   }
-
 }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
index 0fe424501..4fb508790 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestMultiClientLeaderElection.java
@@ -51,11 +51,14 @@ public class TestMultiClientLeaderElection extends 
ZkMetaClientTestBase {
     _zkMetaClient.create("/Parent/a", "");
   }
   @AfterTest
+  @Override
   public void cleanUp() {
     try {
       _zkMetaClient.recursiveDelete(_leaderElectionGroup);
     } catch (MetaClientException ex) {
       _zkMetaClient.recursiveDelete(_leaderElectionGroup);
+    } finally {
+      _zkMetaClient.close();
     }
   }
 


Reply via email to