This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit c48dd61973721032d1c58de71bda628aa6e1ce85 Author: xyuanlu <[email protected]> AuthorDate: Wed Apr 5 23:11:22 2023 -0700 Refactor ZkClient for persist watch (#2426) Refactor ZkClient for persist watch --- .../helix/zookeeper/zkclient/IZkConnection.java | 5 ++ .../apache/helix/zookeeper/zkclient/ZkClient.java | 92 +++++++++++++--------- .../helix/zookeeper/zkclient/ZkConnection.java | 13 +++ 3 files changed, 73 insertions(+), 37 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java index e766bf7d9..bd94432b4 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java @@ -21,6 +21,7 @@ package org.apache.helix.zookeeper.zkclient; import java.util.List; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; @@ -63,4 +64,8 @@ public interface IZkConnection { public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException; public void addAuthInfo(String scheme, byte[] auth); + + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException, + InterruptedException; + public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType) throws InterruptedException, KeeperException; } \ No newline at end of file diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index f87edda2b..dbb73864b 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -76,6 +76,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * "Native ZkClient": not to be used directly. * @@ -259,14 +260,9 @@ public class ZkClient implements Watcher { } public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode) { - synchronized (_childListener) { - Set<IZkChildListener> listeners = _childListener.get(path); - if (listeners == null) { - listeners = new CopyOnWriteArraySet<>(); - _childListener.put(path, listeners); + synchronized (_childListener) { + addChildListener(path, listener); } - listeners.add(listener); - } List<String> children = watchForChilds(path, skipWatchingNonExistNode); if (children == null && skipWatchingNonExistNode) { @@ -279,32 +275,15 @@ public class ZkClient implements Watcher { } public void unsubscribeChildChanges(String path, IZkChildListener childListener) { - synchronized (_childListener) { - final Set<IZkChildListener> listeners = _childListener.get(path); - if (listeners != null) { - listeners.remove(childListener); - } + synchronized (_childListener) { + removeChildListener(path, childListener); } } public boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode) { - Set<IZkDataListenerEntry> listenerEntries; - synchronized (_dataListener) { - listenerEntries = _dataListener.get(path); - if (listenerEntries == null) { - listenerEntries = new CopyOnWriteArraySet<>(); - _dataListener.put(path, listenerEntries); - } - boolean prefetchEnabled = isPrefetchEnabled(listener); - IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled); - listenerEntries.add(listenerEntry); - if (prefetchEnabled) { - if (LOG.isDebugEnabled()) { - LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}", - _uid, path, listener, prefetchEnabled); - } - } + synchronized (_dataListener) { + addDataListener(path, listener); } boolean watchInstalled = watchForData(path, skipWatchingNonExistNode); @@ -355,16 +334,9 @@ public class ZkClient implements Watcher { } public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { - synchronized (_dataListener) { - final Set<IZkDataListenerEntry> listeners = _dataListener.get(path); - if (listeners != null) { - IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener); - listeners.remove(listenerEntry); + synchronized (_dataListener) { + removeDataListener(path, dataListener); } - if (listeners == null || listeners.isEmpty()) { - _dataListener.remove(path); - } - } } public void subscribeStateChanges(final IZkStateListener listener) { @@ -2907,4 +2879,50 @@ public class ZkClient implements Watcher { + " is greater than ZkClient size limit " + WRITE_SIZE_LIMIT); } } + + private void addDataListener(String path, IZkDataListener listener) { + Set<IZkDataListenerEntry> listenerEntries; + listenerEntries = _dataListener.get(path); + if (listenerEntries == null) { + listenerEntries = new CopyOnWriteArraySet<>(); + _dataListener.put(path, listenerEntries); + } + + boolean prefetchEnabled = isPrefetchEnabled(listener); + IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled); + listenerEntries.add(listenerEntry); + if (prefetchEnabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}", _uid, + path, listener, prefetchEnabled); + } + } + } + + private void removeDataListener(String path, IZkDataListener dataListener) { + final Set<IZkDataListenerEntry> listeners = _dataListener.get(path); + if (listeners != null) { + IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener); + listeners.remove(listenerEntry); + } + if (listeners == null || listeners.isEmpty()) { + _dataListener.remove(path); + } + } + + private void addChildListener(String path, IZkChildListener listener) { + Set<IZkChildListener> listeners = _childListener.get(path); + if (listeners == null) { + listeners = new CopyOnWriteArraySet<>(); + _childListener.put(path, listeners); + } + listeners.add(listener); + } + + private void removeChildListener(String path, IZkChildListener listener) { + final Set<IZkChildListener> listeners = _childListener.get(path); + if (listeners != null) { + listeners.remove(listener); + } + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java index 01935919c..2f7ac27de 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys; import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; @@ -252,6 +253,18 @@ public class ZkConnection implements IZkConnection { _getChildrenMethod.getName()); } + @Override + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) + throws KeeperException, InterruptedException { + _zk.addWatch(basePath, watcher, mode); + } + + @Override + public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType) + throws InterruptedException, KeeperException { + _zk.removeWatches(path, watcher, watcherType, true); + } + private Method doLookUpGetChildrenMethod() { if (!GETCHILDREN_PAGINATION_DISABLED) { try {
