This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push: new eaa88f600 Implement timeout for auto reconnect (#2409) eaa88f600 is described below commit eaa88f60082e31bc1441ef5d65d8ea2888b72301 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Tue Apr 4 13:13:46 2023 -0700 Implement timeout for auto reconnect (#2409) Implement timeout for auto reconnect ZkClient does auto reconnect when it get's disconnected from Zookeeper. There is no timeout when reconnect can not be established. In ZkMetaclient, we have a separate thread to monitor reconnect status and close ZkClient when connection can not be reestablished when timed out. --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 113 ++++++++++++- .../impl/zk/factory/ZkMetaClientConfig.java | 1 - .../policy/ExponentialBackoffReconnectPolicy.java | 20 ++- .../policy/MetaClientReconnectPolicy.java | 3 +- .../metaclient/policy/NoRetryReconnectPolicy.java | 36 ----- .../zk/TestConnectStateChangeListenerAndRetry.java | 180 +++++++++++++++++++++ .../helix/metaclient/impl/zk/TestZkMetaClient.java | 32 ---- .../metaclient/impl/zk/ZkMetaClientTestBase.java | 3 +- .../apache/helix/zookeeper/zkclient/ZkClient.java | 1 + 9 files changed, 304 insertions(+), 85 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 9614e53d6..84c329fe4 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -19,9 +19,15 @@ package org.apache.helix.metaclient.impl.zk; * under the License. */ +import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.metaclient.api.AsyncCallback; import org.apache.helix.metaclient.api.ChildChangeListener; @@ -47,33 +53,47 @@ import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil; import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult; import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode; import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException; + public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class); private final ZkClient _zkClient; private final long _initConnectionTimeout; private final long _reconnectTimeout; + // After ZkClient gets disconnected from ZK server, it keeps retrying connection until connection + // is re-established or ZkClient is closed. We need a separate thread to monitor ZkClient + // reconnect and close ZkClient if it not able to reconnect within user specified timeout. + private final ScheduledExecutorService _zkClientReconnectMonitor; + private ScheduledFuture<?> _reconnectMonitorFuture; + private ReconnectStateChangeListener _reconnectStateChangeListener; + // Lock all activities related to ZkClient connection + private ReentrantLock _zkClientConnectionMutex = new ReentrantLock(); + + public ZkMetaClient(ZkMetaClientConfig config) { _initConnectionTimeout = config.getConnectionInitTimeoutInMillis(); _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout(); // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should - // 1. Allow user to config max backoff interval (next PR) - // 2. Allow user to config reconnect policy (future PR) + // Allow user to config reconnect policy _zkClient = new ZkClient( new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()), (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/, config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false); + _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor(); + _reconnectStateChangeListener = new ReconnectStateChangeListener(); } @Override @@ -266,18 +286,25 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void connect() { - // TODO: throws IllegalStateException when already connected try { + _zkClientConnectionMutex.lock(); _zkClient.connect(_initConnectionTimeout, _zkClient); + // register _reconnectStateChangeListener as state change listener to react to ZkClient connect + // state change event. When ZkClient disconnected from ZK, it still auto reconnect until + // ZkClient is closed or connection re-established. + // We will need to close ZkClient when user set retry connection timeout. + _zkClient.subscribeStateChanges(_reconnectStateChangeListener); } catch (ZkException e) { throw translateZkExceptionToMetaclientException(e); + } finally { + _zkClientConnectionMutex.unlock(); } } @Override public void disconnect() { - // TODO: This is a temp impl for test only. no proper interrupt handling and error handling. - _zkClient.close(); + cleanUpAndClose(true, true); + _zkClientReconnectMonitor.shutdownNow(); } @Override @@ -394,4 +421,80 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { public T deserialize(byte[] bytes, String path) { return _zkClient.deserialize(bytes, path); } + + /** + * A clean up method called when connect state change or MetaClient is closing. + * @param cancel If we want to cancel the reconnect monitor thread. + * @param close If we want to close ZkClient. + */ + private void cleanUpAndClose(boolean cancel, boolean close) { + _zkClientConnectionMutex.lock(); + try { + if (close && !_zkClient.isClosed()) { + _zkClient.close(); + // TODO: need to unsubscribe all persist watcher from ZK + // Add this in ZkClient when persist watcher change is in + // Also need to manually send CLOSED state change to state + // change listener (in change adapter) + LOG.info("ZkClient is closed"); + } + + if (cancel && _reconnectMonitorFuture != null) { + _reconnectMonitorFuture.cancel(true); + LOG.info("ZkClient reconnect monitor thread is canceled"); + } + + } finally { + _zkClientConnectionMutex.unlock(); + } + } + + private class ReconnectStateChangeListener implements IZkStateListener { + // Schedule a monitor to track ZkClient auto reconnect when Disconnected + // Cancel the monitor thread when connected. + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { + if (state == Watcher.Event.KeeperState.Disconnected) { + // Expired. start a new event monitoring retry + _zkClientConnectionMutex.lockInterruptibly(); + try { + if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled() + || _reconnectMonitorFuture.isDone()) { + _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> { + if (!_zkClient.getConnection().getZookeeperState().isConnected()) { + cleanUpAndClose(false, true); + } + }, _reconnectTimeout, TimeUnit.MILLISECONDS); + LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}", + _reconnectTimeout); + } + } finally { + _zkClientConnectionMutex.unlock(); + } + } else if (state == Watcher.Event.KeeperState.SyncConnected + || state == Watcher.Event.KeeperState.ConnectedReadOnly) { + cleanUpAndClose(true, false); + LOG.info("ZkClient is SyncConnected, reconnect monitor thread is canceled (if any)"); + } + } + + // Cancel the monitor thread when connected. + @Override + public void handleNewSession(String sessionId) throws Exception { + cleanUpAndClose(true, false); + LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)"); + } + + // Cancel the monitor thread and close ZkClient when connect error. + @Override + public void handleSessionEstablishmentError(Throwable error) throws Exception { + cleanUpAndClose(true, true); + LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)"); + } + } + + @VisibleForTesting + ZkClient getZkClient() { + return _zkClient; + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java index 82c9bb20a..63d6ff07c 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java @@ -130,7 +130,6 @@ public class ZkMetaClientConfig extends MetaClientConfig { @Override public ZkMetaClientConfig build() { validate(); - return new ZkMetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis, _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth, MetaClientConfig.StoreType.ZOOKEEPER, _monitorType, _monitorKey, _monitorInstanceName, diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java index 81e0c44f7..7c5829e10 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java @@ -21,6 +21,7 @@ package org.apache.helix.metaclient.policy; import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy; +import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_AUTO_RECONNECT_TIMEOUT_MS; import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS; import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS; @@ -32,22 +33,25 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_ */ public class ExponentialBackoffReconnectPolicy implements MetaClientReconnectPolicy { - private final long _maxBackOffInterval; - private final long _initBackoffInterval; + private final long _autoReconnectTimeout; @Override public RetryPolicyName getPolicyName() { return RetryPolicyName.EXP_BACKOFF; } - public ExponentialBackoffReconnectPolicy() { - _initBackoffInterval = DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS; - _maxBackOffInterval = DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS; + @Override + public long getAutoReconnectTimeout() { + return _autoReconnectTimeout; } - public ExponentialBackoffReconnectPolicy(long maxBackOffInterval, long initBackoffInterval) { - _maxBackOffInterval = maxBackOffInterval; - _initBackoffInterval = initBackoffInterval; + public ExponentialBackoffReconnectPolicy() { + _autoReconnectTimeout = DEFAULT_AUTO_RECONNECT_TIMEOUT_MS; + } + public ExponentialBackoffReconnectPolicy(long autoReconnectTimeout) { + _autoReconnectTimeout = autoReconnectTimeout; } + + // TODO: Allow user to pass maxBackOffInterval and initBackoffInterval. } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java index be80fdd59..5ef56988e 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java @@ -30,14 +30,13 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_ public interface MetaClientReconnectPolicy { enum RetryPolicyName { - NO_RETRY, EXP_BACKOFF, LINEAR_BACKOFF } RetryPolicyName getPolicyName(); - default long getAutoReconnectTimeout() { + default long getAutoReconnectTimeout() { return DEFAULT_AUTO_RECONNECT_TIMEOUT_MS; } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java deleted file mode 100644 index f81273b3a..000000000 --- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.helix.metaclient.policy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy; - - -/** - * Policy to define client re-establish connection behavior when connection to underlying metadata - * store is expired. - * If this retry policy is passed to MetaClient, no auto retry connection will be issued when - * connection lost or expired. - */ -public class NoRetryReconnectPolicy implements MetaClientReconnectPolicy { - @Override - public RetryPolicyName getPolicyName() { - return RetryPolicyName.NO_RETRY; - } -} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java new file mode 100644 index 000000000..36b9b2131 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java @@ -0,0 +1,180 @@ +package org.apache.helix.metaclient.impl.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.helix.metaclient.api.ConnectStateChangeListener; +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; +import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy; +import org.apache.helix.zookeeper.zkclient.ZkClient; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS; +import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS; + + +public class TestConnectStateChangeListenerAndRetry { + protected static final String ZK_ADDR = "localhost:2181"; + protected static ZkServer _zkServer; + + private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000; + private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000; + private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000; + + /** + * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly + * This need to be done in a separate thread to simulate ZkClient eventThread. + */ + private static void simulateZkStateReconnected(ZkClient zkClient) throws InterruptedException { + WatchedEvent event = + new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, + null); + zkClient.process(event); + + Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN); + + event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, + null); + zkClient.process(event); + } + + @BeforeSuite + public void prepare() { + System.out.println("START TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis())); + // start local zookeeper server + _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR); + } + + @AfterSuite + public void cleanUp() { + System.out.println("END TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testConnectState() { + System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis())); + try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) { + zkMetaClient.connect(); + zkMetaClient.connect(); + Assert.fail("The second connect should throw IllegalStateException"); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IllegalStateException); + Assert.assertEquals(ex.getMessage(), "ZkClient is not in init state. connect() has already been called."); + } + System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis())); + } + + // test mock zkclient event + @Test(dependsOnMethods = "testConnectState") + public void testReConnectSucceed() throws InterruptedException { + System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testReConnectSucceed at " + new Date(System.currentTimeMillis())); + try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) { + CountDownLatch countDownLatch = new CountDownLatch(1); + + zkMetaClient.connect(); + // We need a separate thread to simulate reconnect. In ZkClient there is assertion to check + // reconnect and and CRUDs are not in the same thread. (So one does not block another) + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + simulateZkStateReconnected(zkMetaClient.getZkClient()); + } catch (InterruptedException e) { + Assert.fail("Exception in simulateZkStateReconnected", e); + } + countDownLatch.countDown(); + } + }); + countDownLatch.await(5000, TimeUnit.SECONDS); + Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD); + // When ZK reconnect happens within timeout window, zkMetaClient should ba able to perform CRUD. + Assert.assertTrue(zkMetaClient.getZkClient().getConnection().getZookeeperState().isConnected()); + zkMetaClient.create("/key", "value"); + Assert.assertEquals(zkMetaClient.get("/key"), "value"); + } + System.out.println("END TestConnectStateChangeListenerAndRetry.testReConnectSucceed at " + new Date(System.currentTimeMillis())); + } + + @Test(dependsOnMethods = "testReConnectSucceed") + public void testConnectStateChangeListener() throws Exception { + System.out.println("START TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis())); + try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) { + CountDownLatch countDownLatch = new CountDownLatch(1); + final MetaClientInterface.ConnectState[] connectState = + new MetaClientInterface.ConnectState[2]; + ConnectStateChangeListener listener = new ConnectStateChangeListener() { + @Override + public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState, + MetaClientInterface.ConnectState currentState) throws Exception { + connectState[0] = prevState; + connectState[1] = currentState; + countDownLatch.countDown(); + } + + @Override + public void handleConnectionEstablishmentError(Throwable error) throws Exception { + + } + }; + Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener)); + zkMetaClient.connect(); + countDownLatch.await(5000, TimeUnit.SECONDS); + Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED); + Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED); + + _zkServer.shutdown(); + Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD); + Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.CONNECTED); + Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.DISCONNECTED); + + try { + zkMetaClient.create("/key", "value"); + Assert.fail("Create call after close should throw IllegalStateException"); + } catch (Exception ex) { + Assert.assertTrue(ex.getCause() instanceof IllegalStateException); + } + } + System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis())); + } + + static ZkMetaClient<String> createZkMetaClientReconnectTest() { + ZkMetaClientConfig config = + new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) + .setMetaClientReconnectPolicy( + new ExponentialBackoffReconnectPolicy( + AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST)) + .build(); + return new ZkMetaClient<>(config); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index cdb894b34..6eb624fb8 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -52,9 +52,6 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS public class TestZkMetaClient extends ZkMetaClientTestBase{ - private static final String ZK_ADDR = "localhost:2183"; - private static final int DEFAULT_TIMEOUT_MS = 1000; - private static final String ENTRY_STRING_VALUE = "test-value"; private static final String TRANSACTION_TEST_PARENT_PATH = "/transactionOpTestPath"; private static final String TEST_INVALID_PATH = "/_invalid/a/b/c"; @@ -351,35 +348,6 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{ } } - @Test - public void testConnectStateChangeListener() throws Exception { - final String basePath = "/TestZkMetaClient_testConnectionStateChangeListener"; - try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { - CountDownLatch countDownLatch = new CountDownLatch(1); - final MetaClientInterface.ConnectState[] connectState = - new MetaClientInterface.ConnectState[2]; - ConnectStateChangeListener listener = new ConnectStateChangeListener() { - @Override - public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState, - MetaClientInterface.ConnectState currentState) throws Exception { - connectState[0] = prevState; - connectState[1] = currentState; - countDownLatch.countDown(); - } - - @Override - public void handleConnectionEstablishmentError(Throwable error) throws Exception { - - } - }; - Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener)); - zkMetaClient.connect(); - countDownLatch.await(5000, TimeUnit.SECONDS); - Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED); - Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED); - } - } - /** * Transactional op calls zk.multi() with a set of ops (operations) * and the return values are converted into metaclient opResults. diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java index 51c655602..e00eb9e83 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; +import org.apache.helix.metaclient.factories.MetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; import org.apache.helix.zookeeper.zkclient.ZkServer; @@ -68,7 +69,7 @@ public abstract class ZkMetaClientTestBase { return new ZkMetaClient<>(config); } - protected static ZkServer startZkServer(final String zkAddress) { + public static ZkServer startZkServer(final String zkAddress) { String zkDir = zkAddress.replace(':', '_'); final String logDir = "/tmp/" + zkDir + "/logs"; final String dataDir = "/tmp/" + zkDir + "/dataDir"; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 51904ede6..f87edda2b 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -2615,6 +2615,7 @@ public class ZkClient implements Watcher { } _eventThread.interrupt(); _eventThread.join(2000); + // TODO: Closing _event thread here will miss final `CLOSE` state change. if (isManagingZkConnection()) { LOG.info("Closing zkclient uid:{}, zk:{}", _uid, ((ZkConnection) connection).getZookeeper()); connection.close();