This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch rest0
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c23abb1b567f733a62383dedfbc7e1d4f1353595
Author: Huizhi L <[email protected]>
AuthorDate: Thu Jan 9 15:30:44 2020 -0800

    Add method to wait and return established session's ID (#677)
    
    zkClient's getSessionId() could bring in session race condition: session A 
is connected in waitUntilConnected, but when zkClient.getSession() is called in 
zkHelixManager, session A might be already expired and so zkClient.getSession() 
gets session B. This session ID is critical for the firs time handling new 
session after zkclient is created in zkHelixManager.
    
    Solution: add a new method waitForEstablishedSession() to wait for 
SynCconnected state and return the session id before unlocking the eventLock.
    
    Change list:
    - Add a new method waitForEstablishedSession()
    - Add a unit test to cover the new method.
---
 .../apache/helix/manager/zk/ZKHelixManager.java    | 10 +++------
 .../helix/manager/zk/client/HelixZkClient.java     | 23 +++++++++++++++++++
 .../helix/manager/zk/zookeeper/ZkClient.java       | 26 +++++++++++++++++++---
 .../apache/helix/manager/zk/TestRawZkClient.java   | 20 +++++++++++++++++
 4 files changed, 69 insertions(+), 10 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 6fabca6..0d66af8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -699,19 +699,15 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     int retryCount = 0;
     while (retryCount < 3) {
       try {
-        // TODO: get session id from waitUntilConnected to avoid race condition
-        if (!_zkclient.waitUntilConnected(_connectionInitTimeout, 
TimeUnit.MILLISECONDS)) {
-          throw new ZkTimeoutException(
-              "Unable to connect to zookeeper server within timeout: " + 
_connectionInitTimeout
-                  + " ms.");
-        }
+        final long sessionId =
+            _zkclient.waitForEstablishedSession(_connectionInitTimeout, 
TimeUnit.MILLISECONDS);
         handleStateChanged(KeeperState.SyncConnected);
         /*
          * This listener is subscribed after SyncConnected and firing new 
session events,
          * which means this listener has not yet handled new session, so we 
have to handle new
          * session here just for this listener.
          */
-        handleNewSession(ZKUtil.toHexSessionId(_zkclient.getSessionId()));
+        handleNewSession(ZKUtil.toHexSessionId(sessionId));
         break;
       } catch (HelixException e) {
         LOG.error("fail to createClient.", e);
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
index f5ed25a..5f58b69 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
@@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.manager.zk.BasicZkSerializer;
@@ -182,6 +183,28 @@ public interface HelixZkClient {
   // ZK state control
   boolean waitUntilConnected(long time, TimeUnit timeUnit);
 
+  /**
+   * Waits for SyncConnected state and returns a valid session ID(non-zero). 
The implementation of
+   * this method should wait for SyncConnected state and ZK session to be 
established, and should
+   * guarantee the established session's ID is returned before keeper state 
changes.
+   *
+   * Please note: this default implementation may have race condition issue 
and return an unexpected
+   * session ID that is zero or another new session's ID. The default 
implementation is for backward
+   * compatibility purpose.
+   *
+   * @param timeout Max waiting time for connecting to ZK server.
+   * @param timeUnit Time unit for the timeout.
+   * @return A valid ZK session ID which is non-zero.
+   */
+  default long waitForEstablishedSession(long timeout, TimeUnit timeUnit) {
+    if (!waitUntilConnected(timeout, timeUnit)) {
+      throw new ZkTimeoutException(
+          "Failed to get established session because connecting to ZK server 
has timed out in "
+              + timeout + " " + timeUnit);
+    }
+    return getSessionId();
+  }
+
   String getServers();
 
   long getSessionId();
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 94bccbb..a28ea83 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -1368,15 +1368,29 @@ public class ZkClient implements Watcher {
     return _connection;
   }
 
+  public long waitForEstablishedSession(long timeout, TimeUnit timeUnit) {
+    validateCurrentThread();
+
+    acquireEventLock();
+    try {
+      if (!waitForKeeperState(KeeperState.SyncConnected, timeout, timeUnit)) {
+        throw new ZkTimeoutException("Waiting to be connected to ZK server has 
timed out.");
+      }
+      // Reading session ID before unlocking event lock is critical to 
guarantee the established
+      // session's ID won't change.
+      return getSessionId();
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
   public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws 
ZkInterruptedException {
     return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit);
   }
 
   public boolean waitForKeeperState(KeeperState keeperState, long time, 
TimeUnit timeUnit)
       throws ZkInterruptedException {
-    if (_zookeeperEventThread != null && Thread.currentThread() == 
_zookeeperEventThread) {
-      throw new IllegalArgumentException("Must not be done in the zookeeper 
event thread.");
-    }
+    validateCurrentThread();
     Date timeout = new Date(System.currentTimeMillis() + 
timeUnit.toMillis(time));
 
     LOG.debug("Waiting for keeper state " + keeperState);
@@ -2136,4 +2150,10 @@ public class ZkClient implements Watcher {
       return _listener.hashCode();
     }
   }
+
+  private void validateCurrentThread() {
+    if (_zookeeperEventThread != null && Thread.currentThread() == 
_zookeeperEventThread) {
+      throw new IllegalArgumentException("Must not be done in the zookeeper 
event thread.");
+    }
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index fd7bb3c..111bd7a 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -712,4 +712,24 @@ public class TestRawZkClient extends ZkUnitTestBase {
     // of its owner expires.
     _zkClient.delete(path);
   }
+
+  @Test
+  public void testWaitForEstablishedSession() {
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    Assert.assertTrue(zkClient.waitForEstablishedSession(1, TimeUnit.SECONDS) 
!= 0L);
+    TestHelper.stopZkServer(_zkServer);
+    Assert.assertTrue(zkClient.waitForKeeperState(KeeperState.Disconnected, 1, 
TimeUnit.SECONDS));
+
+    try {
+      zkClient.waitForEstablishedSession(3, TimeUnit.SECONDS);
+      Assert.fail("Connecting to zk server should time out and 
ZkTimeoutException is expected.");
+    } catch (ZkTimeoutException expected) {
+      // Because zk server is shutdown, zkClient should not connect to zk 
server and a
+      // ZkTimeoutException should be thrown.
+    }
+
+    zkClient.close();
+    // Recover zk server for later tests.
+    _zkServer.start();
+  }
 }

Reply via email to