This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch rest0 in repository https://gitbox.apache.org/repos/asf/helix.git
commit c23abb1b567f733a62383dedfbc7e1d4f1353595 Author: Huizhi L <[email protected]> AuthorDate: Thu Jan 9 15:30:44 2020 -0800 Add method to wait and return established session's ID (#677) zkClient's getSessionId() could bring in session race condition: session A is connected in waitUntilConnected, but when zkClient.getSession() is called in zkHelixManager, session A might be already expired and so zkClient.getSession() gets session B. This session ID is critical for the firs time handling new session after zkclient is created in zkHelixManager. Solution: add a new method waitForEstablishedSession() to wait for SynCconnected state and return the session id before unlocking the eventLock. Change list: - Add a new method waitForEstablishedSession() - Add a unit test to cover the new method. --- .../apache/helix/manager/zk/ZKHelixManager.java | 10 +++------ .../helix/manager/zk/client/HelixZkClient.java | 23 +++++++++++++++++++ .../helix/manager/zk/zookeeper/ZkClient.java | 26 +++++++++++++++++++--- .../apache/helix/manager/zk/TestRawZkClient.java | 20 +++++++++++++++++ 4 files changed, 69 insertions(+), 10 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 6fabca6..0d66af8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -699,19 +699,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { int retryCount = 0; while (retryCount < 3) { try { - // TODO: get session id from waitUntilConnected to avoid race condition - if (!_zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS)) { - throw new ZkTimeoutException( - "Unable to connect to zookeeper server within timeout: " + _connectionInitTimeout - + " ms."); - } + final long sessionId = + _zkclient.waitForEstablishedSession(_connectionInitTimeout, TimeUnit.MILLISECONDS); handleStateChanged(KeeperState.SyncConnected); /* * This listener is subscribed after SyncConnected and firing new session events, * which means this listener has not yet handled new session, so we have to handle new * session here just for this listener. */ - handleNewSession(ZKUtil.toHexSessionId(_zkclient.getSessionId())); + handleNewSession(ZKUtil.toHexSessionId(sessionId)); break; } catch (HelixException e) { LOG.error("fail to createClient.", e); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java index f5ed25a..5f58b69 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java @@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.exception.ZkTimeoutException; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.manager.zk.BasicZkSerializer; @@ -182,6 +183,28 @@ public interface HelixZkClient { // ZK state control boolean waitUntilConnected(long time, TimeUnit timeUnit); + /** + * Waits for SyncConnected state and returns a valid session ID(non-zero). The implementation of + * this method should wait for SyncConnected state and ZK session to be established, and should + * guarantee the established session's ID is returned before keeper state changes. + * + * Please note: this default implementation may have race condition issue and return an unexpected + * session ID that is zero or another new session's ID. The default implementation is for backward + * compatibility purpose. + * + * @param timeout Max waiting time for connecting to ZK server. + * @param timeUnit Time unit for the timeout. + * @return A valid ZK session ID which is non-zero. + */ + default long waitForEstablishedSession(long timeout, TimeUnit timeUnit) { + if (!waitUntilConnected(timeout, timeUnit)) { + throw new ZkTimeoutException( + "Failed to get established session because connecting to ZK server has timed out in " + + timeout + " " + timeUnit); + } + return getSessionId(); + } + String getServers(); long getSessionId(); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index 94bccbb..a28ea83 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -1368,15 +1368,29 @@ public class ZkClient implements Watcher { return _connection; } + public long waitForEstablishedSession(long timeout, TimeUnit timeUnit) { + validateCurrentThread(); + + acquireEventLock(); + try { + if (!waitForKeeperState(KeeperState.SyncConnected, timeout, timeUnit)) { + throw new ZkTimeoutException("Waiting to be connected to ZK server has timed out."); + } + // Reading session ID before unlocking event lock is critical to guarantee the established + // session's ID won't change. + return getSessionId(); + } finally { + getEventLock().unlock(); + } + } + public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException { return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit); } public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) throws ZkInterruptedException { - if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { - throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); - } + validateCurrentThread(); Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time)); LOG.debug("Waiting for keeper state " + keeperState); @@ -2136,4 +2150,10 @@ public class ZkClient implements Watcher { return _listener.hashCode(); } } + + private void validateCurrentThread() { + if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { + throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java index fd7bb3c..111bd7a 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java @@ -712,4 +712,24 @@ public class TestRawZkClient extends ZkUnitTestBase { // of its owner expires. _zkClient.delete(path); } + + @Test + public void testWaitForEstablishedSession() { + ZkClient zkClient = new ZkClient(ZK_ADDR); + Assert.assertTrue(zkClient.waitForEstablishedSession(1, TimeUnit.SECONDS) != 0L); + TestHelper.stopZkServer(_zkServer); + Assert.assertTrue(zkClient.waitForKeeperState(KeeperState.Disconnected, 1, TimeUnit.SECONDS)); + + try { + zkClient.waitForEstablishedSession(3, TimeUnit.SECONDS); + Assert.fail("Connecting to zk server should time out and ZkTimeoutException is expected."); + } catch (ZkTimeoutException expected) { + // Because zk server is shutdown, zkClient should not connect to zk server and a + // ZkTimeoutException should be thrown. + } + + zkClient.close(); + // Recover zk server for later tests. + _zkServer.start(); + } }
