This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 22144a78b Use persist watcher for listener registration in ZkClient
(when configured) (#2432)
22144a78b is described below
commit 22144a78b09a64ef579309f289a4331c9cf90c1d
Author: xyuanlu <[email protected]>
AuthorDate: Tue Apr 25 09:29:54 2023 -0700
Use persist watcher for listener registration in ZkClient (when configured)
(#2432)
Before ZK 3.6, all zk watchers are one time watcher. ZkClient used to
resubscribe every time when an event happens.
This change adopts ZK persist watcher.
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 2 +-
.../apache/helix/metaclient/impl/zk/TestUtil.java | 96 ++++++++++++++
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 73 +++++++++--
.../helix/zookeeper/zkclient/IZkConnection.java | 5 +-
.../apache/helix/zookeeper/zkclient/ZkClient.java | 142 +++++++++++++++++++--
.../helix/zookeeper/zkclient/ZkConnection.java | 16 +--
.../impl/client/TestZkClientPersistWatcher.java | 111 ++++++++++++++++
7 files changed, 414 insertions(+), 31 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 984766254..6888ee4ed 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -315,7 +315,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public boolean subscribeDataChange(String key, DataChangeListener listener,
boolean skipWatchingNonExistNode) {
_zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
- return false;
+ return true;
}
@Override
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
new file mode 100644
index 000000000..1296a72f3
--- /dev/null
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -0,0 +1,96 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class TestUtil {
+
+ static java.lang.reflect.Field getField(Class clazz, String fieldName)
+ throws NoSuchFieldException {
+ try {
+ return clazz.getDeclaredField(fieldName);
+ } catch (NoSuchFieldException e) {
+ Class superClass = clazz.getSuperclass();
+ if (superClass == null) {
+ throw e;
+ }
+ return getField(superClass, fieldName);
+ }
+ }
+
+ public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client)
+ throws Exception {
+ Map<String, List<String>> lists = new HashMap<String, List<String>>();
+ ZkClient zkClient = (ZkClient) client;
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper zk = connection.getZookeeper();
+
+ java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
+ field.setAccessible(true);
+ Object watchManager = field.get(zk);
+
+ java.lang.reflect.Field field2 = getField(watchManager.getClass(),
"dataWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> dataWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "existWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> existWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "childWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> childWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "persistentWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> persistentWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "persistentRecursiveWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> persistentRecursiveWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+
+ lists.put("dataWatches", new ArrayList<>(dataWatches.keySet()));
+ lists.put("existWatches", new ArrayList<>(existWatches.keySet()));
+ lists.put("childWatches", new ArrayList<>(childWatches.keySet()));
+ lists.put("persistentWatches", new
ArrayList<>(persistentWatches.keySet()));
+ lists.put("persistentRecursiveWatches", new
ArrayList<>(persistentRecursiveWatches.keySet()));
+
+ return lists;
+ }
+}
\ No newline at end of file
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 6eb624fb8..cbefd2156 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -46,6 +46,7 @@ import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static
org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE;
import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
@@ -323,7 +324,7 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
@Test
public void testDirectChildChangeListener() throws Exception {
final String basePath = "/TestZkMetaClient_testDirectChildChangeListener";
- final int count = 3;
+ final int count = 1000;
try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
zkMetaClient.connect();
CountDownLatch countDownLatch = new CountDownLatch(count);
@@ -337,14 +338,70 @@ public class TestZkMetaClient extends
ZkMetaClientTestBase{
Assert.assertTrue(
zkMetaClient.subscribeDirectChildChange(basePath, listener, false)
.isRegistered());
- zkMetaClient.create(basePath + "/child_1", "test-data");
- //TODO: the native zkclient failed to provide persistent listener, and
event might be lost.
- // Remove Thread.sleep() below when the persistent watcher is supported
- Thread.sleep(500);
- zkMetaClient.create(basePath + "/child_2", "test-data");
- Thread.sleep(500);
- zkMetaClient.create(basePath + "/child_3", "test-data");
+ for(int i=0; i<1000; ++i){
+ zkMetaClient.create(basePath + "/child_" +i, "test-data");
+ }
+ // Verify no one time watcher is registered. Only one persist listener
is registered.
+ Map<String, List<String>> watchers =
TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentWatches").size(), 1);
+ Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
+ Assert.assertEquals(watchers.get("childWatches").size(), 0);
+ Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+ Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ zkMetaClient.unsubscribeDirectChildChange(basePath, listener);
+ // verify that no listener is registered on any path
+ watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+ Assert.assertEquals(watchers.get("childWatches").size(), 0);
+ Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+ }
+ }
+
+ @Test
+ public void testDataChangeListener() throws Exception {
+ final String basePath = "/TestZkMetaClient_testDataChangeListener";
+ final int count = 200;
+ final int[] get_count = {0};
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ CountDownLatch countDownLatch = new CountDownLatch(count);
+ DataChangeListener listener = new DataChangeListener() {
+
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType
changeType)
+ throws Exception {
+ if(changeType == ENTRY_UPDATE) {
+ get_count[0]++;
+ countDownLatch.countDown();
+ }
+ }
+ };
+ zkMetaClient.create(basePath, "");
+ Assert.assertTrue(
+ zkMetaClient.subscribeDataChange(basePath, listener, false)
+ );
+ // Verify no one time watcher is registered. Only one persist listener
is registered.
+ Map<String, List<String>> watchers =
TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentWatches").size(), 1);
+ Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
+ Assert.assertEquals(watchers.get("childWatches").size(), 0);
+ Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+ for (int i=0; i<200; ++i) {
+ zkMetaClient.set(basePath, "data7" + i, -1);
+ }
Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+
+
+ zkMetaClient.unsubscribeDataChange(basePath, listener);
+ // verify that no listener is registered on any path
+ watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+ Assert.assertEquals(watchers.get("childWatches").size(), 0);
+ Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
}
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
index bd94432b4..43eefad26 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
@@ -64,8 +64,7 @@ public interface IZkConnection {
public List<OpResult> multi(Iterable<Op> ops) throws KeeperException,
InterruptedException;
public void addAuthInfo(String scheme, byte[] auth);
-
- public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
throws KeeperException,
-
InterruptedException;
+ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
+ throws KeeperException, InterruptedException;
public void removeWatches(String path, Watcher watcher,
Watcher.WatcherType watcherType) throws InterruptedException, KeeperException;
}
\ No newline at end of file
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index a8fb3ede2..41c23a6d1 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
import javax.management.JMException;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
@@ -59,6 +61,7 @@ import
org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
+import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -103,7 +106,7 @@ public class ZkClient implements Watcher {
private static AtomicLong UID = new AtomicLong(0);
public final long _uid;
- // ZNode write size limit in bytes.
+ // ZNode write size limit in bytes:
// TODO: use ZKConfig#JUTE_MAXBUFFER once bumping up ZK to 3.5.2+
private static final int WRITE_SIZE_LIMIT =
Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER,
ZNRecord.SIZE_LIMIT);
@@ -133,8 +136,14 @@ public class ZkClient implements Watcher {
private volatile boolean _closed;
private PathBasedZkSerializer _pathBasedZkSerializer;
private ZkClientMonitor _monitor;
+ // When _usePersistWatcher is true, ZKClient will register itself as persist
watcher to Zk Server.
+ // No re-register is needed after change event.
+ // Default value is false, meaning ZKClient will register itself as regular
ont time watcher to
+ // Zk Server and will re-register after an data or child change.
private boolean _usePersistWatcher;
+ private final ReentrantLock _persistListenerMutex;
+
// To automatically retry the async operation, we need a separate thread
other than the
// ZkEventThread. Otherwise the retry request might block the normal event
processing.
protected final ZkAsyncRetryThread _asyncCallRetryThread;
@@ -247,6 +256,7 @@ public class ZkClient implements Watcher {
connect(connectionTimeout, this);
}
_usePersistWatcher = usePersistWatcher;
+ _persistListenerMutex = new ReentrantLock();
}
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long
operationRetryTimeout,
@@ -262,9 +272,13 @@ public class ZkClient implements Watcher {
}
public ChildrenSubscribeResult subscribeChildChanges(String path,
IZkChildListener listener, boolean skipWatchingNonExistNode) {
+ if (_usePersistWatcher) {
+ addPersistListener(path, listener);
+ } else {
synchronized (_childListener) {
addChildListener(path, listener);
}
+ }
List<String> children = watchForChilds(path, skipWatchingNonExistNode);
if (children == null && skipWatchingNonExistNode) {
@@ -277,15 +291,22 @@ public class ZkClient implements Watcher {
}
public void unsubscribeChildChanges(String path, IZkChildListener
childListener) {
+ if (_usePersistWatcher) {
+ removePersistListener(path, childListener);
+ } else {
synchronized (_childListener) {
removeChildListener(path, childListener);
+ }
}
}
public boolean subscribeDataChanges(String path, IZkDataListener listener,
boolean skipWatchingNonExistNode) {
-
+ if (_usePersistWatcher) {
+ addPersistListener(path, listener);
+ } else {
synchronized (_dataListener) {
addDataListener(path, listener);
+ }
}
boolean watchInstalled = watchForData(path, skipWatchingNonExistNode);
@@ -336,9 +357,13 @@ public class ZkClient implements Watcher {
}
public void unsubscribeDataChanges(String path, IZkDataListener
dataListener) {
+ if (_usePersistWatcher) {
+ removePersistListener(path, dataListener);
+ } else {
synchronized (_dataListener) {
removeDataListener(path, dataListener);
}
+ }
}
public void subscribeStateChanges(final IZkStateListener listener) {
@@ -384,6 +409,20 @@ public class ZkClient implements Watcher {
}
public void unsubscribeAll() {
+ if (_usePersistWatcher) {
+ ManipulateListener removeAllListeners = () -> {
+ Stream.concat(_childListener.keySet().stream(),
_dataListener.keySet().stream())
+ .forEach(p -> {
+ try {
+ getConnection().removeWatches(p, this, WatcherType.Any);
+ } catch (InterruptedException | KeeperException e) {
+ LOG.error("Failed to remove persistent watcher for {} ", p, e);
+ }
+ });
+ };
+ executeWithInPersistListenerMutex(removeAllListeners);
+ return;
+ }
synchronized (_childListener) {
_childListener.clear();
}
@@ -1228,6 +1267,10 @@ public class ZkClient implements Watcher {
if (LOG.isDebugEnabled()) {
LOG.debug("zkclient {}, Received event: {} ", _uid, event);
}
+
+ if (event.getType() == EventType.PersistentWatchRemoved) {
+ return;
+ }
_zookeeperEventThread = Thread.currentThread();
boolean stateChanged = event.getPath() == null;
@@ -1313,7 +1356,7 @@ public class ZkClient implements Watcher {
* are deleted before the last page is fetched. The upstream caller should
be able to handle this.
*/
public List<String> getChildren(String path) {
- return getChildren(path, hasListeners(path));
+ return getChildren(path, (!_usePersistWatcher) && hasListeners(path));
}
protected List<String> getChildren(final String path, final boolean watch) {
@@ -1768,7 +1811,8 @@ public class ZkClient implements Watcher {
// the exists() useGetData (false) route to check stat.
Otherwise, we use getData()
// to install watch.
Stat stat = null;
- if (!pathExists) {
+ // no register one time watcher when _usePersistWatcher is true.
+ if (_usePersistWatcher || !pathExists) {
stat = getStat(path, false);
} else {
stat = installWatchOnlyPathExist(path);
@@ -1785,7 +1829,7 @@ public class ZkClient implements Watcher {
}
try {
// TODO: the data is redundantly read multiple times when
multiple listeners exist
- data = readData(path, null, true);
+ data = readData(path, null, !_usePersistWatcher);
} catch (ZkNoNodeException e) {
LOG.warn("zkclient {} Prefetch data for path: {} failed.",
_uid, path, e);
listener.getDataListener().handleDataDeleted(path);
@@ -1812,7 +1856,7 @@ public class ZkClient implements Watcher {
public void run() throws Exception {
if (!pathStatRecord.pathChecked()) {
Stat stat = null;
- if (!pathExists || !hasListeners(path)) {
+ if (_usePersistWatcher || !pathExists || !hasListeners(path)) {
// will not install listener using exists call
stat = getStat(path, false);
} else {
@@ -2379,10 +2423,25 @@ public class ZkClient implements Watcher {
private boolean watchForData(final String path, boolean
skipWatchingNonExistNode) {
try {
- if (skipWatchingNonExistNode) {
- retryUntilConnected(() -> (((ZkConnection)
getConnection()).getZookeeper().getData(path, true, new Stat())));
+ if (_usePersistWatcher) {
+ return retryUntilConnected(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ if (!skipWatchingNonExistNode || exists(path)) {
+ getConnection().addWatch(path, ZkClient.this,
AddWatchMode.PERSISTENT);
+ return true;
+ }
+ return false;
+ }
+ });
} else {
- retryUntilConnected(() -> (((ZkConnection)
getConnection()).getZookeeper().exists(path, true)));
+ if (skipWatchingNonExistNode) {
+ retryUntilConnected(() -> (((ZkConnection)
getConnection()).getZookeeper()
+ .getData(path, true, new Stat())));
+ } else {
+ retryUntilConnected(
+ () -> (((ZkConnection)
getConnection()).getZookeeper().exists(path, true)));
+ }
}
} catch (ZkNoNodeException e) {
// Do nothing, this is what we want as this is not going to leak watch
in ZooKeeepr server.
@@ -2434,11 +2493,20 @@ public class ZkClient implements Watcher {
return retryUntilConnected(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
- if (!skipWatchingNonExistNode) {
+ // We only register one time watcher without checking in path exists
+ // when _usePersistWatcher is false and skipWatchingNonExistNode is
false.
+ if (!skipWatchingNonExistNode && !_usePersistWatcher) {
exists(path, true);
}
try {
- return getChildren(path, true);
+ if (_usePersistWatcher) {
+ if (!skipWatchingNonExistNode || exists(path)) {
+ getConnection().addWatch(path, ZkClient.this,
AddWatchMode.PERSISTENT);
+ }
+ return getChildren(path, false);
+ } else {
+ return getChildren(path, true);
+ }
} catch (ZkNoNodeException e) {
// ignore, the "exists" watch will listen for the parent node to
appear
LOG.info("zkclient{} watchForChilds path not existing:{}
skipWatchingNodeNoteExist: {}",
@@ -2927,4 +2995,56 @@ public class ZkClient implements Watcher {
listeners.remove(listener);
}
}
+
+ interface ManipulateListener {
+ void run() throws KeeperException, InterruptedException;
+ }
+
+ private void addPersistListener(String path, Object listener) {
+ ManipulateListener addListeners = () -> {
+ if (listener instanceof IZkChildListener) {
+ addChildListener(path, (IZkChildListener) listener);
+ } else if (listener instanceof IZkDataListener) {
+ addDataListener(path, (IZkDataListener) listener);
+ }
+ };
+ executeWithInPersistListenerMutex(addListeners);
+ }
+
+
+ // TODO: Consider create an empty interface and let the two listeners
interface extend that
+ // interface for code clean.
+ private void removePersistListener(String path, Object listener) {
+
+ ManipulateListener removeListeners = () -> {
+ try {
+ if (listener instanceof IZkChildListener) {
+ removeChildListener(path, (IZkChildListener) listener);
+ } else if (listener instanceof IZkDataListener) {
+ removeDataListener(path, (IZkDataListener) listener);
+ }
+ if (!hasListeners(path)) {
+ // TODO: update hasListeners logic when recursive persist listener
is added
+ getConnection().removeWatches(path, this, WatcherType.Any);
+ }
+ } catch (KeeperException.NoWatcherException e) {
+ LOG.warn("Persist watcher is already removed");
+ }
+ };
+
+ executeWithInPersistListenerMutex(removeListeners);
+ }
+
+ private void executeWithInPersistListenerMutex(ManipulateListener runnable) {
+ try {
+ _persistListenerMutex.lockInterruptibly();
+ runnable.run();
+ } catch (KeeperException.NoWatcherException e) {
+ LOG.warn("Persist watcher is already removed");
+ } catch (KeeperException | InterruptedException ex) {
+ throw new ZkException(ex);
+ } finally {
+ _persistListenerMutex.unlock();
+ }
+ }
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 2f7ac27de..376409231 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -245,14 +245,6 @@ public class ZkConnection implements IZkConnection {
_zk.addAuthInfo(scheme, auth);
}
- private void lookupGetChildrenMethod() {
- _getChildrenMethod = doLookUpGetChildrenMethod();
-
- LOG.info("Pagination config {}={}, method to be invoked: {}",
- ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED,
GETCHILDREN_PAGINATION_DISABLED,
- _getChildrenMethod.getName());
- }
-
@Override
public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
throws KeeperException, InterruptedException {
@@ -265,6 +257,14 @@ public class ZkConnection implements IZkConnection {
_zk.removeWatches(path, watcher, watcherType, true);
}
+ private void lookupGetChildrenMethod() {
+ _getChildrenMethod = doLookUpGetChildrenMethod();
+
+ LOG.info("Pagination config {}={}, method to be invoked: {}",
+ ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED,
GETCHILDREN_PAGINATION_DISABLED,
+ _getChildrenMethod.getName());
+ }
+
private Method doLookUpGetChildrenMethod() {
if (!GETCHILDREN_PAGINATION_DISABLED) {
try {
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
new file mode 100644
index 000000000..1a13fc3ba
--- /dev/null
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
@@ -0,0 +1,111 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.impl.ZkTestHelper;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkClientPersistWatcher extends ZkTestBase {
+
+ @Test
+ void testZkClientDataChange() throws Exception {
+ ZkClient.Builder builder = new ZkClient.Builder();
+ builder.setZkServer(ZkTestBase.ZK_ADDR)
+ .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
+ ZkClient zkClient = builder.build();
+ zkClient.setZkSerializer(new BasicZkSerializer(new
SerializableSerializer()));
+ int count = 1000;
+ final int[] event_count = {0};
+ CountDownLatch countDownLatch1 = new CountDownLatch(count);
+ String path = "/dataChangeTestPath";
+ IZkDataListener dataListener = new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws
Exception {
+ countDownLatch1.countDown();
+ event_count[0]++ ;
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ }
+ };
+
+ zkClient.subscribeDataChanges(path, dataListener);
+ zkClient.create(path, "datat", CreateMode.PERSISTENT);
+ for(int i=0; i<count; ++i) {
+ zkClient.writeData(path, ("datat"+i), -1);
+ }
+
+ Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
+ zkClient.close();
+ }
+
+ @Test(dependsOnMethods = "testZkClientDataChange")
+ void testZkClientChildChange() throws Exception {
+ ZkClient.Builder builder = new ZkClient.Builder();
+ builder.setZkServer(ZkTestBase.ZK_ADDR)
+ .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
+ ZkClient zkClient = builder.build();
+ zkClient.setZkSerializer(new BasicZkSerializer(new
SerializableSerializer()));
+ int count = 100;
+ final int[] event_count = {0};
+ CountDownLatch countDownLatch1 = new CountDownLatch(count);
+ CountDownLatch countDownLatch2 = new CountDownLatch(count);
+ String path = "/testZkClientChildChange";
+ IZkChildListener childListener = new IZkChildListener() {
+ @Override
+ public void handleChildChange(String parentPath, List<String>
currentChilds)
+ throws Exception {
+ countDownLatch1.countDown();
+ event_count[0]++ ;
+ }
+ };
+ IZkChildListener childListener2 = new IZkChildListener() {
+ @Override
+ public void handleChildChange(String parentPath, List<String>
currentChilds)
+ throws Exception {
+ countDownLatch2.countDown();
+ event_count[0]++ ;
+ }
+ };
+ zkClient.subscribeChildChanges(path, childListener);
+ zkClient.subscribeChildChanges(path, childListener2);
+ zkClient.create(path, "datat", CreateMode.PERSISTENT);
+ for(int i=0; i<count; ++i) {
+ zkClient.create(path + "/child" +i , "datat", CreateMode.PERSISTENT);
+ }
+ Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS));
+ zkClient.close();
+ }
+
+}
\ No newline at end of file