This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 1e0914aaa55c0c697e790385888564bbb6fe876e Author: xyuanlu <[email protected]> AuthorDate: Mon Apr 10 09:09:39 2023 -0700 Add an option in metaclient to use persist watcher (#2434) --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 2 +- .../helix/zookeeper/impl/client/ZkClient.java | 21 ++++++++++++++------- .../apache/helix/zookeeper/zkclient/ZkClient.java | 6 ++++-- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 84c329fe4..af31423d0 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -91,7 +91,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()), (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/, config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), - config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false); + config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true); _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor(); _reconnectStateChangeListener = new ReconnectStateChangeListener(); } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java index f3742b55b..1dd601c21 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java @@ -91,19 +91,19 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple * explicitly before talking to ZK. */ public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, - PathBasedZkSerializer zkSerializer, - String monitorType, String monitorKey, String monitorInstanceName, - boolean monitorRootPathOnly, boolean connectOnInit) { + PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, + String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit, + boolean usePersistWatcher) { super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, - monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit); + monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit, usePersistWatcher); } public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly) { - this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey, - monitorInstanceName, monitorRootPathOnly, true); + this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, + monitorKey, monitorInstanceName, monitorRootPathOnly, true, false); } public ZkClient(IZkConnection connection, int connectionTimeout, @@ -200,6 +200,7 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple String _monitorInstanceName = null; boolean _monitorRootPathOnly = true; boolean _connectOnInit = true; + boolean _usePersistWatcher = false; /** * If set true, the client will connect to ZK during initialization. @@ -278,6 +279,11 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple return this; } + public Builder setUsePersistWatcher(boolean usePersistWatcher) { + this._usePersistWatcher = usePersistWatcher; + return this; + } + public ZkClient build() { if (_connection == null) { if (_zkServer == null) { @@ -293,7 +299,8 @@ public class ZkClient extends org.apache.helix.zookeeper.zkclient.ZkClient imple } return new ZkClient(_connection, _connectionTimeout, _operationRetryTimeout, _zkSerializer, - _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit); + _monitorType, _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _connectOnInit, + _usePersistWatcher); } } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index dbb73864b..a8fb3ede2 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -133,6 +133,7 @@ public class ZkClient implements Watcher { private volatile boolean _closed; private PathBasedZkSerializer _pathBasedZkSerializer; private ZkClientMonitor _monitor; + private boolean _usePersistWatcher; // To automatically retry the async operation, we need a separate thread other than the // ZkEventThread. Otherwise the retry request might block the normal event processing. @@ -216,7 +217,7 @@ public class ZkClient implements Watcher { protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, - String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit) { + String monitorInstanceName, boolean monitorRootPathOnly, boolean connectOnInit, boolean usePersistWatcher) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } @@ -245,13 +246,14 @@ public class ZkClient implements Watcher { if (connectOnInit) { connect(connectionTimeout, this); } + _usePersistWatcher = usePersistWatcher; } protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly) { this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, monitorKey, - monitorInstanceName, monitorRootPathOnly, true); + monitorInstanceName, monitorRootPathOnly, true, false); } public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
