This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 61abe3113 Add an option in metaclient to use persist watcher (#2434)
61abe3113 is described below
commit 61abe311397952f5c2779ba2890ae7466448d205
Author: xyuanlu <[email protected]>
AuthorDate: Mon Apr 10 09:09:39 2023 -0700
Add an option in metaclient to use persist watcher (#2434)
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 2 +-
.../helix/zookeeper/impl/client/ZkClient.java | 21 ++++++++++++++-------
.../apache/helix/zookeeper/zkclient/ZkClient.java | 6 ++++--
3 files changed, 19 insertions(+), 10 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 84c329fe4..af31423d0 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -91,7 +91,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
new ZkConnection(config.getConnectionAddress(), (int)
config.getSessionTimeoutInMillis()),
(int) _initConnectionTimeout, _reconnectTimeout /*use reconnect
timeout for retry timeout*/,
config.getZkSerializer(), config.getMonitorType(),
config.getMonitorKey(),
- config.getMonitorInstanceName(), config.getMonitorRootPathOnly(),
false);
+ config.getMonitorInstanceName(), config.getMonitorRootPathOnly(),
false, true);
_zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
_reconnectStateChangeListener = new ReconnectStateChangeListener();
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
index bd76595fa..f388c9083 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java
@@ -89,19 +89,19 @@ public class ZkClient extends
org.apache.helix.zookeeper.zkclient.ZkClient imple
* explicitly before talking to ZK.
*/
public ZkClient(IZkConnection zkConnection, int connectionTimeout, long
operationRetryTimeout,
- PathBasedZkSerializer zkSerializer,
- String monitorType, String monitorKey, String monitorInstanceName,
- boolean monitorRootPathOnly, boolean connectOnInit) {
+ PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey,
+ String monitorInstanceName, boolean monitorRootPathOnly, boolean
connectOnInit,
+ boolean usePersistWatcher) {
super(zkConnection, connectionTimeout, operationRetryTimeout,
zkSerializer, monitorType,
- monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit);
+ monitorKey, monitorInstanceName, monitorRootPathOnly, connectOnInit,
usePersistWatcher);
}
public ZkClient(IZkConnection zkConnection, int connectionTimeout, long
operationRetryTimeout,
PathBasedZkSerializer zkSerializer,
String monitorType, String monitorKey, String monitorInstanceName,
boolean monitorRootPathOnly) {
- this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer,
monitorType, monitorKey,
- monitorInstanceName, monitorRootPathOnly, true);
+ this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer,
monitorType,
+ monitorKey, monitorInstanceName, monitorRootPathOnly, true, false);
}
public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -198,6 +198,7 @@ public class ZkClient extends
org.apache.helix.zookeeper.zkclient.ZkClient imple
String _monitorInstanceName = null;
boolean _monitorRootPathOnly = true;
boolean _connectOnInit = true;
+ boolean _usePersistWatcher = false;
/**
* If set true, the client will connect to ZK during initialization.
@@ -276,6 +277,11 @@ public class ZkClient extends
org.apache.helix.zookeeper.zkclient.ZkClient imple
return this;
}
+ public Builder setUsePersistWatcher(boolean usePersistWatcher) {
+ this._usePersistWatcher = usePersistWatcher;
+ return this;
+ }
+
public ZkClient build() {
if (_connection == null) {
if (_zkServer == null) {
@@ -291,7 +297,8 @@ public class ZkClient extends
org.apache.helix.zookeeper.zkclient.ZkClient imple
}
return new ZkClient(_connection, _connectionTimeout,
_operationRetryTimeout, _zkSerializer,
- _monitorType, _monitorKey, _monitorInstanceName,
_monitorRootPathOnly, _connectOnInit);
+ _monitorType, _monitorKey, _monitorInstanceName,
_monitorRootPathOnly, _connectOnInit,
+ _usePersistWatcher);
}
}
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index dbb73864b..a8fb3ede2 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -133,6 +133,7 @@ public class ZkClient implements Watcher {
private volatile boolean _closed;
private PathBasedZkSerializer _pathBasedZkSerializer;
private ZkClientMonitor _monitor;
+ private boolean _usePersistWatcher;
// To automatically retry the async operation, we need a separate thread
other than the
// ZkEventThread. Otherwise the retry request might block the normal event
processing.
@@ -216,7 +217,7 @@ public class ZkClient implements Watcher {
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long
operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey,
- String monitorInstanceName, boolean monitorRootPathOnly, boolean
connectOnInit) {
+ String monitorInstanceName, boolean monitorRootPathOnly, boolean
connectOnInit, boolean usePersistWatcher) {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
@@ -245,13 +246,14 @@ public class ZkClient implements Watcher {
if (connectOnInit) {
connect(connectionTimeout, this);
}
+ _usePersistWatcher = usePersistWatcher;
}
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long
operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey,
String monitorInstanceName, boolean monitorRootPathOnly) {
this(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer,
monitorType, monitorKey,
- monitorInstanceName, monitorRootPathOnly, true);
+ monitorInstanceName, monitorRootPathOnly, true, false);
}
public List<String> subscribeChildChanges(String path, IZkChildListener
listener) {