This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 5ed14aefceea8064fbe369b1b200495fe9f2de9d Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Wed Feb 15 14:20:34 2023 -0500 Create separate API for persistent and one-time listener (#2376) Create separate API for persistent and one-time listener --- .../helix/metaclient/api/MetaClientInterface.java | 75 +++++++++++++++++----- .../helix/metaclient/impl/zk/ZkMetaClient.java | 18 ++---- .../helix/metaclient/impl/zk/TestZkMetaClient.java | 8 +-- 3 files changed, 68 insertions(+), 33 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index dc310b0ef..a4c5113f2 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -22,6 +22,7 @@ package org.apache.helix.metaclient.api; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.metaclient.exception.MetaClientInterruptException; import org.apache.helix.metaclient.exception.MetaClientTimeoutException; @@ -378,61 +379,105 @@ public interface MetaClientInterface<T> { /** * Subscribe change of a particular entry. Including entry data change, entry deletion and creation * of the given key. + * The listener should be permanent until it's unsubscribed. * @param key Key to identify the entry * @param listener An implementation of DataChangeListener * @see org.apache.helix.metaclient.api.DataChangeListener * @param skipWatchingNonExistNode Will not register lister to an non-exist key if set to true. * Please set to false if you are expecting ENTRY_CREATED type. - * @param persistListener The listener will persist when set to true. Otherwise it will be a one - * time triggered listener. * @return Return an boolean indication if subscribe succeeded. */ - boolean subscribeDataChange(String key, DataChangeListener listener, - boolean skipWatchingNonExistNode, boolean persistListener); + boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode); + + /** + * Subscribe a one-time change of a particular entry. Including entry data change, entry deletion and creation + * of the given key. + * The implementation should use at-most-once delivery semantic. + * @param key Key to identify the entry + * @param listener An implementation of DataChangeListener + * @see org.apache.helix.metaclient.api.DataChangeListener + * @param skipWatchingNonExistNode Will not register lister to an non-exist key if set to true. + * Please set to false if you are expecting ENTRY_CREATED type. + * @return Return an boolean indication if subscribe succeeded. + */ + default boolean subscribeOneTimeDataChange(String key, DataChangeListener listener, + boolean skipWatchingNonExistNode) { + throw new NotImplementedException("subscribeOneTimeDataChange is not implemented"); + } /** * Subscribe for direct child change event on a particular key. It includes new child * creation or deletion. It does not include existing child data change. + * The listener should be permanent until it's unsubscribed. * For hierarchy key spaces like zookeeper, it refers to an entry's direct children nodes. * For flat key spaces, it refers to keys that matches `prefix*separator`. * @param key key to identify the entry. * @param listener An implementation of DirectSubEntryChangeListener. * @see org.apache.helix.metaclient.api.DirectChildChangeListener * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered. - * @param persistListener The listener will persist when set to true. Otherwise it will be a one - * time triggered listener. * * @return Return an DirectSubEntrySubscribeResult. It will contain a list of direct sub children if * subscribe succeeded. */ DirectChildSubscribeResult subscribeDirectChildChange(String key, - DirectChildChangeListener listener, boolean skipWatchingNonExistNode, - boolean persistListener); + DirectChildChangeListener listener, boolean skipWatchingNonExistNode); + + /** + * Subscribe for a one-time direct child change event on a particular key. It includes new child + * creation or deletion. It does not include existing child data change. + * The implementation should use at-most-once delivery semantic. + * For hierarchy key spaces like zookeeper, it refers to an entry's direct children nodes. + * For flat key spaces, it refers to keys that matches `prefix*separator`. + * + * @param key key to identify the entry. + * @param listener An implementation of DirectSubEntryChangeListener. + * @see org.apache.helix.metaclient.api.DirectChildChangeListener + * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered. + * + * @return Return an DirectSubEntrySubscribeResult. It will contain a list of direct sub children if + * subscribe succeeded. + */ + default DirectChildSubscribeResult subscribeOneTimeDirectChildChange(String key, + DirectChildChangeListener listener, boolean skipWatchingNonExistNode) { + throw new NotImplementedException("subscribeOneTimeDirectChildChange is not implemented"); + } /** * Subscribe for connection state change. + * The listener should be permanent until it's unsubscribed. * @param listener An implementation of ConnectStateChangeListener. * @see org.apache.helix.metaclient.api.ConnectStateChangeListener - * @param persistListener The listener will persist when set to true. Otherwise it will be a one - * time triggered listener. * * @return Return an boolean indication if subscribe succeeded. */ - boolean subscribeStateChanges(ConnectStateChangeListener listener, boolean persistListener); + boolean subscribeStateChanges(ConnectStateChangeListener listener); /** * Subscribe change for all children including entry change and data change. + * The listener should be permanent until it's unsubscribed. + * For hierarchy key spaces like zookeeper, it would watch the whole tree structure. + * For flat key spaces, it would watch for keys with certain prefix. + * @param key key to identify the entry. + * @param listener An implementation of ChildChangeListener. + * @see org.apache.helix.metaclient.api.ChildChangeListener + * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered. + */ + boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode); + + /** + * Subscribe a one-time change for all children including entry change and data change. + * The implementation should use at-most-once delivery semantic. * For hierarchy key spaces like zookeeper, it would watch the whole tree structure. * For flat key spaces, it would watch for keys with certain prefix. * @param key key to identify the entry. * @param listener An implementation of ChildChangeListener. * @see org.apache.helix.metaclient.api.ChildChangeListener * @param skipWatchingNonExistNode If the passed in key does not exist, no listener wil be registered. - * @param persistListener The listener will persist when set to true. Otherwise it will be a one - * time triggered listener. */ - boolean subscribeChildChanges(String key, ChildChangeListener listener, - boolean skipWatchingNonExistNode, boolean persistListener); + default boolean subscribeOneTimeChildChanges(String key, ChildChangeListener listener, + boolean skipWatchingNonExistNode) { + throw new NotImplementedException("subscribeOneTimeChildChanges is not implemented"); + } /** * Unsubscribe the listener to further changes. No-op if the listener is not subscribed to the key. diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index fea40bcde..e042368d3 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -253,36 +253,26 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { } @Override - public boolean subscribeDataChange(String key, DataChangeListener listener, - boolean skipWatchingNonExistNode, boolean persistListener) { - if (!persistListener) { - throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient."); - } + public boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode) { _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener)); return false; } @Override public DirectChildSubscribeResult subscribeDirectChildChange(String key, - DirectChildChangeListener listener, boolean skipWatchingNonExistNode, - boolean persistListener) { - if (!persistListener) { - throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient."); - } + DirectChildChangeListener listener, boolean skipWatchingNonExistNode) { ChildrenSubscribeResult result = _zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode); return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled()); } @Override - public boolean subscribeStateChanges(ConnectStateChangeListener listener, - boolean persistListener) { + public boolean subscribeStateChanges(ConnectStateChangeListener listener) { return false; } @Override - public boolean subscribeChildChanges(String key, ChildChangeListener listener, - boolean skipWatchingNonExistNode, boolean persistListener) { + public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) { return false; } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 76ac4b0b3..6cccfae72 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -222,7 +222,7 @@ public class TestZkMetaClient { try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { zkMetaClient.connect(); MockDataChangeListener listener = new MockDataChangeListener(); - zkMetaClient.subscribeDataChange(path, listener, false, true); + zkMetaClient.subscribeDataChange(path, listener, false); zkMetaClient.create(path, "test-node"); int expectedCallCount = 0; synchronized (_syncObject) { @@ -260,7 +260,7 @@ public class TestZkMetaClient { } // register a new non-persistent listener try { - zkMetaClient.subscribeDataChange(path, new MockDataChangeListener(), false, false); + zkMetaClient.subscribeOneTimeDataChange(path, new MockDataChangeListener(), false); Assert.fail("One-time listener is not supported, NotImplementedException should be thrown."); } catch (NotImplementedException e) { // expected @@ -293,7 +293,7 @@ public class TestZkMetaClient { } }; listeners.get(path).add(listener); - zkMetaClient.subscribeDataChange(path, listener, false, true); + zkMetaClient.subscribeDataChange(path, listener, false); } } zkMetaClient.set(basePath + "_1", testData, -1); @@ -317,7 +317,7 @@ public class TestZkMetaClient { }; zkMetaClient.create(basePath, ""); Assert.assertTrue( - zkMetaClient.subscribeDirectChildChange(basePath, listener, false, true) + zkMetaClient.subscribeDirectChildChange(basePath, listener, false) .isRegistered()); zkMetaClient.create(basePath + "/child_1", "test-data"); //TODO: the native zkclient failed to provide persistent listener, and event might be lost.
