This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit f6e194f11251e9ab39b15f8d54f71c6cd2059703 Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Mon Jan 9 12:56:54 2023 -0500 New features and improvement in zookeeper-api to prepare meta-client implementation (#2333) Prepare zkclient for meta-client DataChangeListener add new method in IZkDataListener Implement boolean flag connectOnInit in zkclient for decoupling Override equals and hashcode for the converter class --- .github/workflows/Helix-PR-CI.yml | 2 +- .../helix/metaclient/impl/zk/ZkMetaClient.java | 54 ++++++++++++++++++++++ .../helix/zookeeper/impl/client/ZkClient.java | 26 +++++++++-- .../helix/zookeeper/zkclient/IZkDataListener.java | 7 +++ .../apache/helix/zookeeper/zkclient/ZkClient.java | 43 +++++++++++------ .../zookeeper/zkclient/metric/ZkClientMonitor.java | 6 +++ 6 files changed, 119 insertions(+), 19 deletions(-) diff --git a/.github/workflows/Helix-PR-CI.yml b/.github/workflows/Helix-PR-CI.yml index c061c5ecf..d0ef2752c 100644 --- a/.github/workflows/Helix-PR-CI.yml +++ b/.github/workflows/Helix-PR-CI.yml @@ -1,7 +1,7 @@ name: Helix PR CI on: pull_request: - branches: [ master ] + branches: [ master, metaclient ] # TODO: remove side branch paths-ignore: - '.github/**' - 'helix-front/**' diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 194cf18f9..b3270ad81 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -33,7 +33,9 @@ import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; +import org.apache.zookeeper.Watcher; public class ZkMetaClient implements MetaClientInterface { @@ -247,4 +249,56 @@ public class ZkMetaClient implements MetaClientInterface { public List<OpResult> transactionOP(Iterable iterable) { return null; } + + /** + * A converter class to transform {@link DataChangeListener} to {@link IZkDataListener} + */ + static class DataListenerConverter implements IZkDataListener { + private final DataChangeListener _listener; + + DataListenerConverter(DataChangeListener listener) { + _listener = listener; + } + + private DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) { + switch (eventType) { + case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED; + case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE; + case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED; + default: throw new IllegalArgumentException("EventType " + eventType + " is not supported."); + } + } + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported."); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted); + } + + @Override + public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception { + _listener.handleDataChange(dataPath, data, convertType(eventType)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataListenerConverter that = (DataListenerConverter) o; + return _listener.equals(that._listener); + } + + @Override + public int hashCode() { + return _listener.hashCode(); + } + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java index 915a60777..f3742b55b 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java @@ -87,13 +87,23 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple * The JMX bean name will be: HelixZkClient.monitorType.monitorKey.monitorInstanceName. * @param monitorRootPathOnly * Should only stat of access to root path be reported to JMX bean or path-specific stat be reported too. + * @param connectOnInit true if connect to ZK during initialization, otherwise user will need to call connect + * explicitly before talking to ZK. */ public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, String monitorInstanceName, - boolean monitorRootPathOnly) { + boolean monitorRootPathOnly, boolean connectOnInit) { super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, - monitorKey, monitorInstanceName, monitorRootPathOnly); + monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit); + } + + public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, + PathBasedZkSerializer zkSerializer, + String monitorType, String monitorKey, String monitorInstanceName, + boolean monitorRootPathOnly) { + this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey, + monitorInstanceName, monitorRootPathOnly, true); } public ZkClient(IZkConnection connection, int connectionTimeout, @@ -189,6 +199,16 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple String _monitorKey; String _monitorInstanceName = null; boolean _monitorRootPathOnly = true; + boolean _connectOnInit = true; + + /** + * If set true, the client will connect to ZK during initialization. + * Otherwise, user has to call connect() method explicitly before talking to ZK. + */ + public Builder setConnectOnInit(boolean connectOnInit) { + _connectOnInit = connectOnInit; + return this; + } public Builder setConnection(IZkConnection connection) { this._connection = connection; @@ -273,7 +293,7 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple } return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer, - _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly); + _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit); } } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java index 6d90e8de8..0a3e8b3e5 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkDataListener.java @@ -19,6 +19,9 @@ package org.apache.helix.zookeeper.zkclient; * under the License. */ +import org.apache.zookeeper.Watcher; + + /** * An {@link IZkDataListener} can be registered at a {@link ZkClient} for listening on zk data changes for a given path. * @@ -31,4 +34,8 @@ public interface IZkDataListener { public void handleDataChange(String dataPath, Object data) throws Exception; public void handleDataDeleted(String dataPath) throws Exception; + + default void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception { + handleDataChange(dataPath, data); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 527e46f90..4fca90da9 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -214,7 +214,7 @@ public class ZkClient implements Watcher { protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, - String monitorInstanceName, boolean monitorRootPathOnly) { + String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } @@ -240,17 +240,18 @@ public class ZkClient implements Watcher { LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); } - connect(connectionTimeout, this); - - try { - if (_monitor != null) { - _monitor.register(); - } - } catch (JMException e){ - LOG.error("Error in creating ZkClientMonitor", e); + if (connectOnInit) { + connect(connectionTimeout, this); } } + protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, + PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, + String monitorInstanceName, boolean monitorRootPathOnly) { + this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey, + monitorInstanceName, monitorRootPathOnly, true); + } + public List<String> subscribeChildChanges(String path, IZkChildListener listener) { ChildrenSubscribeResult result = subscribeChildChanges(path, listener, false); return result.getChildren(); @@ -1312,13 +1313,13 @@ public class ZkClient implements Watcher { } } - private void fireAllEvents() { + private void fireAllEvents(WatchedEvent event) { //TODO: During handling new session, if the path is deleted, watcher leakage could still happen for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) { fireChildChangedEvents(entry.getKey(), entry.getValue(), true); } for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) { - fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true); + fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true, event.getType()); } } @@ -1518,7 +1519,7 @@ public class ZkClient implements Watcher { * reconnecting when the session expired. Because previous session expired, we also have to * notify all listeners that something might have changed. */ - fireAllEvents(); + fireAllEvents(event); } } else if (event.getState() == KeeperState.Expired) { _isNewSessionEventFired = false; @@ -1766,13 +1767,13 @@ public class ZkClient implements Watcher { Set<IZkDataListenerEntry> listeners = _dataListener.get(path); if (listeners != null && !listeners.isEmpty()) { fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime), - pathExists); + pathExists, event.getType()); } } } private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners, - final OptionalLong notificationTime, boolean pathExists) { + final OptionalLong notificationTime, boolean pathExists, EventType eventType) { try { final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path); // Trigger listener callbacks @@ -1815,7 +1816,7 @@ public class ZkClient implements Watcher { return; } } - listener.getDataListener().handleDataChange(path, data); + listener.getDataListener().handleDataChange(path, data, eventType); } } }); @@ -2488,6 +2489,11 @@ public class ZkClient implements Watcher { }); } + protected void connect(final long maxMsToWaitUntilConnected) + throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { + connect(maxMsToWaitUntilConnected, this); + } + /** * Connect to ZooKeeper. * @param maxMsToWaitUntilConnected @@ -2553,6 +2559,13 @@ public class ZkClient implements Watcher { close(); } } + try { + if (_monitor != null) { + _monitor.register(); + } + } catch (JMException e){ + LOG.error("Error in creating ZkClientMonitor", e); + } } public long getCreationTime(String path) { diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java index 13231c842..c3b338690 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java @@ -55,6 +55,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider { private String _monitorKey; private String _monitorInstanceName; private boolean _monitorRootOnly; + private volatile boolean _registered = false; private SimpleDynamicMetric<Long> _stateChangeEventCounter; private SimpleDynamicMetric<Long> _expiredSessionCounter; @@ -123,6 +124,9 @@ public class ZkClientMonitor extends DynamicMBeanProvider { @Override public DynamicMBeanProvider register() throws JMException { + if (_registered) { + return this; + } List<DynamicMetric<?, ?>> attributeList = new ArrayList<>(); attributeList.add(_dataChangeEventCounter); attributeList.add(_outstandingRequestGauge); @@ -143,6 +147,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider { } } }); + _registered = true; return this; } @@ -154,6 +159,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider { for (ZkClientPathMonitor zkClientPathMonitor : _zkClientPathMonitorMap.values()) { zkClientPathMonitor.unregister(); } + _registered = false; } @Override
