This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 79bd72dd201c998618e0a3a29831b98588846171 Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Mon Jan 23 18:44:44 2023 -0500 Create adapter package for data and child change listener and prepare zkclient (#2346) Prepare zkclient and implement new adapter for child change listener. --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 59 +---------------- .../impl/zk/adapter/ChildListenerAdapter.java | 75 +++++++++++++++++++++ .../impl/zk/adapter/DataListenerAdapter.java | 77 ++++++++++++++++++++++ .../helix/zookeeper/zkclient/IZkChildListener.java | 15 +++++ .../apache/helix/zookeeper/zkclient/ZkClient.java | 9 +-- 5 files changed, 175 insertions(+), 60 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 00ad18f73..631bd1a3a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -38,9 +38,9 @@ import org.apache.helix.metaclient.constants.MetaClientException; import org.apache.helix.metaclient.constants.MetaClientInterruptException; import org.apache.helix.metaclient.constants.MetaClientNoNodeException; import org.apache.helix.metaclient.constants.MetaClientTimeoutException; +import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.zookeeper.impl.client.ZkClient; -import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.helix.zookeeper.zkclient.exception.ZkException; @@ -49,7 +49,6 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.server.EphemeralType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -261,7 +260,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { if (!persistListener) { throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient."); } - _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener)); + _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener)); return false; } @@ -286,7 +285,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void unsubscribeDataChange(String key, DataChangeListener listener) { - _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener)); + _zkClient.unsubscribeDataChanges(key, new DataListenerAdapter(listener)); } @Override @@ -339,58 +338,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { disconnect(); } - /** - * A converter class to transform {@link DataChangeListener} to {@link IZkDataListener} - */ - static class DataListenerConverter implements IZkDataListener { - private final DataChangeListener _listener; - - DataListenerConverter(DataChangeListener listener) { - _listener = listener; - } - - private DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) { - switch (eventType) { - case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED; - case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE; - case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED; - default: throw new IllegalArgumentException("EventType " + eventType + " is not supported."); - } - } - - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported."); - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted); - } - - @Override - public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception { - _listener.handleDataChange(dataPath, data, convertType(eventType)); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DataListenerConverter that = (DataListenerConverter) o; - return _listener.equals(that._listener); - } - - @Override - public int hashCode() { - return _listener.hashCode(); - } - } - private static MetaClientException translateZkExceptionToMetaclientException(ZkException e) { if (e instanceof ZkNodeExistsException) { return new MetaClientNoNodeException(e); diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java new file mode 100644 index 000000000..28385ff2a --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java @@ -0,0 +1,75 @@ +package org.apache.helix.metaclient.impl.zk.adapter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import org.apache.helix.metaclient.api.ChildChangeListener; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; +import org.apache.zookeeper.Watcher; + + +/** + * A adapter class to transform {@link ChildChangeListener} to {@link IZkChildListener}. + */ +public class ChildListenerAdapter implements IZkChildListener { + private final ChildChangeListener _listener; + + public ChildListenerAdapter(ChildChangeListener listener) { + _listener = listener; + } + + @Override + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { + throw new UnsupportedOperationException("handleChildChange(String parentPath, List<String> currentChilds) " + + "is not supported"); + } + + @Override + public void handleChildChange(String parentPath, List<String> currentChilds, Watcher.Event.EventType eventType) + throws Exception { + _listener.handleChildChange(parentPath, convertType(eventType)); + } + + private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) { + switch (eventType) { + case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED; + case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE; + case NodeDeleted: return ChildChangeListener.ChangeType.ENTRY_DELETED; + default: throw new IllegalArgumentException("EventType " + eventType + " is not supported."); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ChildListenerAdapter that = (ChildListenerAdapter) o; + return _listener.equals(that._listener); + } + + @Override + public int hashCode() { + return _listener.hashCode(); + } +} diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java new file mode 100644 index 000000000..94ae198ce --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java @@ -0,0 +1,77 @@ +package org.apache.helix.metaclient.impl.zk.adapter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.DataChangeListener; +import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.zookeeper.Watcher; + + +/** + * A Adapter class to transform {@link DataChangeListener} to {@link IZkDataListener} + */ +public class DataListenerAdapter implements IZkDataListener { + private final DataChangeListener _listener; + + public DataListenerAdapter(DataChangeListener listener) { + _listener = listener; + } + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + throw new UnsupportedOperationException("handleDataChange(String dataPath, Object data) is not supported."); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + handleDataChange(dataPath, null, Watcher.Event.EventType.NodeDeleted); + } + + @Override + public void handleDataChange(String dataPath, Object data, Watcher.Event.EventType eventType) throws Exception { + _listener.handleDataChange(dataPath, data, convertType(eventType)); + } + + private static DataChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) { + switch (eventType) { + case NodeCreated: return DataChangeListener.ChangeType.ENTRY_CREATED; + case NodeDataChanged: return DataChangeListener.ChangeType.ENTRY_UPDATE; + case NodeDeleted: return DataChangeListener.ChangeType.ENTRY_DELETED; + default: throw new IllegalArgumentException("EventType " + eventType + " is not supported."); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataListenerAdapter that = (DataListenerAdapter) o; + return _listener.equals(that._listener); + } + + @Override + public int hashCode() { + return _listener.hashCode(); + } +} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java index 7623a2ec1..a150f1643 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkChildListener.java @@ -20,6 +20,8 @@ package org.apache.helix.zookeeper.zkclient; */ import java.util.List; +import org.apache.zookeeper.Watcher; + /** * An {@link IZkChildListener} can be registered at a {@link ZkClient} for listening on zk child changes for a given @@ -42,4 +44,17 @@ public interface IZkChildListener { * @throws Exception */ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception; + + /** + * Called when the children of the given path changed. + * + * @param parentPath The parent path + * @param currentChilds The children or null if the root node (parent path) was deleted. + * @param eventType The zookeeper event type + * @throws Exception + */ + default void handleChildChange(String parentPath, List<String> currentChilds, Watcher.Event.EventType eventType) + throws Exception { + handleChildChange(parentPath, currentChilds); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 747b7e7a0..1c00f9768 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -1316,7 +1316,7 @@ public class ZkClient implements Watcher { private void fireAllEvents(WatchedEvent event) { //TODO: During handling new session, if the path is deleted, watcher leakage could still happen for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) { - fireChildChangedEvents(entry.getKey(), entry.getValue(), true); + fireChildChangedEvents(entry.getKey(), entry.getValue(), true, event.getType()); } for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) { fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true, event.getType()); @@ -1758,7 +1758,7 @@ public class ZkClient implements Watcher { if (childListeners != null && !childListeners.isEmpty()) { // TODO recording child changed event propagation latency as well. Note this change will // introduce additional ZK access. - fireChildChangedEvents(path, childListeners, pathExists); + fireChildChangedEvents(path, childListeners, pathExists, event.getType()); } } @@ -1826,7 +1826,8 @@ public class ZkClient implements Watcher { } } - private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists) { + private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists, + EventType eventType) { try { final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path); for (final IZkChildListener listener : childListeners) { @@ -1853,7 +1854,7 @@ public class ZkClient implements Watcher { // Continue trigger the change handler } } - listener.handleChildChange(path, children); + listener.handleChildChange(path, children, eventType); } }); }
