This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6374c6a8497f58a3d3d31adfda19cacbce8bb6b9 Author: xyuanlu <[email protected]> AuthorDate: Thu Mar 9 16:28:31 2023 -0800 Add support for state change in ZkMetaClient Add support for state change in ZkMetaClient --- .../helix/metaclient/api/MetaClientInterface.java | 15 +++-- .../helix/metaclient/impl/zk/ZkMetaClient.java | 6 +- .../zk/adapter/StateChangeListenerAdapter.java | 76 ++++++++++++++++++++++ .../metaclient/impl/zk/util/ZkMetaClientUtil.java | 26 +++++++- .../helix/metaclient/impl/zk/TestZkMetaClient.java | 51 ++++++++++----- .../helix/zookeeper/zkclient/IZkStateListener.java | 4 ++ .../apache/helix/zookeeper/zkclient/ZkClient.java | 9 +-- 7 files changed, 158 insertions(+), 29 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index 433184de6..7445a0d33 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -56,6 +56,9 @@ public interface MetaClientInterface<T> { } enum ConnectState { + // Client is not connected to server. Before initiating connection or after close. + NOT_CONNECTED, + // Client is connected to server CONNECTED, @@ -65,11 +68,15 @@ public interface MetaClientInterface<T> { // Server has expired this connection. EXPIRED, - // When client failed to connect server. - INIT_FAILED, - // When client explicitly call disconnect. - CLOSED_BY_CLIENT + CLOSED_BY_CLIENT, + + // Connection between client and server is lost. + DISCONNECTED, + + // Client is authenticated. They can perform operation with authorized permissions. + // This state is not in use as of now. + AUTHENTICATED } /** diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 7dedd02d5..7934fee8a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -36,6 +36,7 @@ import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter; import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter; +import org.apache.helix.metaclient.impl.zk.adapter.StateChangeListenerAdapter; import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler; import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler; import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler; @@ -285,7 +286,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public boolean subscribeStateChanges(ConnectStateChangeListener listener) { - return false; + _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener)); + return true; } @Override @@ -310,7 +312,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void unsubscribeConnectStateChanges(ConnectStateChangeListener listener) { - + _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener)); } @Override diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java new file mode 100644 index 000000000..8ad324bb2 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java @@ -0,0 +1,76 @@ +package org.apache.helix.metaclient.impl.zk.adapter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.ConnectStateChangeListener; +import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil; +import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.zookeeper.Watcher; + + +public class StateChangeListenerAdapter implements IZkStateListener { + private final ConnectStateChangeListener _listener; + + public StateChangeListenerAdapter(ConnectStateChangeListener listener) { + _listener = listener; + } + + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void handleNewSession(String sessionId) throws Exception { + // This function will be invoked when connection is established. It is a no-op for metaclient. + // MetaClient will expose this to user as 'handleStateChanged' already covers state change + // notification for new connection establishment. + } + + @Override + public void handleSessionEstablishmentError(Throwable error) throws Exception { + _listener.handleConnectionEstablishmentError(error); + } + + @Override + public void handleStateChanged(Watcher.Event.KeeperState prevState, + Watcher.Event.KeeperState curState) throws Exception { + _listener.handleConnectStateChanged( + ZkMetaClientUtil.translateKeeperStateToMetaClientConnectState(prevState), + ZkMetaClientUtil.translateKeeperStateToMetaClientConnectState(curState)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StateChangeListenerAdapter that = (StateChangeListenerAdapter) o; + return _listener.equals(that._listener); + } + + @Override + public int hashCode() { + return _listener.hashCode(); + } +} diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java index a733ba4fb..a9bee4cbb 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java @@ -42,6 +42,7 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.server.EphemeralType; @@ -52,7 +53,7 @@ public class ZkMetaClientUtil { private static final List<ACL> DEFAULT_ACL = Collections.unmodifiableList(ZooDefs.Ids.OPEN_ACL_UNSAFE); - private ZkMetaClientUtil(){ + private ZkMetaClientUtil() { } /** @@ -239,6 +240,29 @@ public class ZkMetaClientUtil { return new MetaClientException(e); } + public static MetaClientInterface.ConnectState translateKeeperStateToMetaClientConnectState( + Watcher.Event.KeeperState keeperState) { + if (keeperState == null) + return MetaClientInterface.ConnectState.NOT_CONNECTED; + switch (keeperState) { + case AuthFailed: + return MetaClientInterface.ConnectState.AUTH_FAILED; + case Closed: + return MetaClientInterface.ConnectState.CLOSED_BY_CLIENT; + case Disconnected: + return MetaClientInterface.ConnectState.DISCONNECTED; + case Expired: + return MetaClientInterface.ConnectState.EXPIRED; + case SaslAuthenticated: + return MetaClientInterface.ConnectState.AUTHENTICATED; + case SyncConnected: + case ConnectedReadOnly: + return MetaClientInterface.ConnectState.CONNECTED; + default: + throw new IllegalArgumentException(keeperState + " is not a supported."); + } + } + /** * This function translate and group Zk exception code to metaclient code. * It currently includes all ZK code on 3.6.3. diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index f23ed0f03..49cbae1f3 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -19,43 +19,29 @@ package org.apache.helix.metaclient.impl.zk; * under the License. */ -import java.io.File; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Set; - -import org.apache.helix.metaclient.api.DataUpdater; -import org.apache.helix.metaclient.api.MetaClientInterface; -import org.apache.helix.metaclient.exception.MetaClientException; -import java.util.HashMap; import java.util.Map; import java.util.Set; - -import org.apache.commons.io.FileUtils; -import org.apache.helix.metaclient.api.DataUpdater; -import org.apache.helix.metaclient.api.DirectChildChangeListener; -import org.apache.helix.metaclient.api.MetaClientInterface; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.NotImplementedException; +import org.apache.helix.metaclient.api.ConnectStateChangeListener; import org.apache.helix.metaclient.api.DataChangeListener; +import org.apache.helix.metaclient.api.DataUpdater; +import org.apache.helix.metaclient.api.DirectChildChangeListener; +import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; -import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; -import org.apache.helix.zookeeper.zkclient.ZkServer; import org.apache.zookeeper.KeeperException; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER; @@ -334,6 +320,35 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{ } } + @Test + public void testConnectStateChangeListener() throws Exception { + final String basePath = "/TestZkMetaClient_testConnectionStateChangeListener"; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + CountDownLatch countDownLatch = new CountDownLatch(1); + final MetaClientInterface.ConnectState[] connectState = + new MetaClientInterface.ConnectState[2]; + ConnectStateChangeListener listener = new ConnectStateChangeListener() { + @Override + public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState, + MetaClientInterface.ConnectState currentState) throws Exception { + connectState[0] = prevState; + connectState[1] = currentState; + countDownLatch.countDown(); + } + + @Override + public void handleConnectionEstablishmentError(Throwable error) throws Exception { + + } + }; + Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener)); + zkMetaClient.connect(); + countDownLatch.await(5000, TimeUnit.SECONDS); + Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED); + Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED); + } + } + /** * Transactional op calls zk.multi() with a set of ops (operations) * and the return values are converted into metaclient opResults. diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java index 5970e623d..f2c4190de 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java @@ -58,4 +58,8 @@ public interface IZkStateListener { * On any error. */ void handleSessionEstablishmentError(final Throwable error) throws Exception; + + default void handleStateChanged(KeeperState prevState, KeeperState curState) throws Exception { + handleStateChanged(curState); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 31fe4e97a..51904ede6 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -1486,12 +1486,13 @@ public class ZkClient implements Watcher { protected void processStateChanged(WatchedEvent event) { LOG.info("zkclient {}, zookeeper state changed ( {} )", _uid, event.getState()); + KeeperState prevState = _currentState; setCurrentState(event.getState()); if (getShutdownTrigger()) { return; } - fireStateChangedEvent(event.getState()); + fireStateChangedEvent(prevState, event.getState()); /* * Note, the intention is that only the ZkClient managing the session would do auto reconnect @@ -1660,15 +1661,15 @@ public class ZkClient implements Watcher { } } - protected void fireStateChangedEvent(final KeeperState state) { + protected void fireStateChangedEvent(final KeeperState prevState, final KeeperState curState) { final String sessionId = getHexSessionId(); for (final IZkStateListener stateListener : _stateListener) { - final String description = "State changed to " + state + " sent to " + stateListener; + final String description = "State changed to " + curState + " sent to " + stateListener; _eventThread.send(new ZkEventThread.ZkEvent(description, sessionId) { @Override public void run() throws Exception { - stateListener.handleStateChanged(state); + stateListener.handleStateChanged(prevState, curState); } }); }
