This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 22144a78b Use persist watcher for listener registration in ZkClient 
(when configured) (#2432)
22144a78b is described below

commit 22144a78b09a64ef579309f289a4331c9cf90c1d
Author: xyuanlu <[email protected]>
AuthorDate: Tue Apr 25 09:29:54 2023 -0700

    Use persist watcher for listener registration in ZkClient (when configured) 
(#2432)
    
    Before ZK 3.6, all zk watchers are one time watcher. ZkClient used to 
resubscribe every time when an event happens.
    This change adopts ZK persist watcher.
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |   2 +-
 .../apache/helix/metaclient/impl/zk/TestUtil.java  |  96 ++++++++++++++
 .../helix/metaclient/impl/zk/TestZkMetaClient.java |  73 +++++++++--
 .../helix/zookeeper/zkclient/IZkConnection.java    |   5 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 142 +++++++++++++++++++--
 .../helix/zookeeper/zkclient/ZkConnection.java     |  16 +--
 .../impl/client/TestZkClientPersistWatcher.java    | 111 ++++++++++++++++
 7 files changed, 414 insertions(+), 31 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 984766254..6888ee4ed 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -315,7 +315,7 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
   @Override
   public boolean subscribeDataChange(String key, DataChangeListener listener, 
boolean skipWatchingNonExistNode) {
     _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
-    return false;
+    return true;
   }
 
   @Override
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
new file mode 100644
index 000000000..1296a72f3
--- /dev/null
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -0,0 +1,96 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class TestUtil {
+
+  static java.lang.reflect.Field getField(Class clazz, String fieldName)
+      throws NoSuchFieldException {
+    try {
+      return clazz.getDeclaredField(fieldName);
+    } catch (NoSuchFieldException e) {
+      Class superClass = clazz.getSuperclass();
+      if (superClass == null) {
+        throw e;
+      }
+      return getField(superClass, fieldName);
+    }
+  }
+
+  public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client)
+      throws Exception {
+    Map<String, List<String>> lists = new HashMap<String, List<String>>();
+    ZkClient zkClient = (ZkClient) client;
+
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+    ZooKeeper zk = connection.getZookeeper();
+
+    java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
+    field.setAccessible(true);
+    Object watchManager = field.get(zk);
+
+    java.lang.reflect.Field field2 = getField(watchManager.getClass(), 
"dataWatches");
+    field2.setAccessible(true);
+    HashMap<String, Set<Watcher>> dataWatches =
+        (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+    field2 = getField(watchManager.getClass(), "existWatches");
+    field2.setAccessible(true);
+    HashMap<String, Set<Watcher>> existWatches =
+        (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+    field2 = getField(watchManager.getClass(), "childWatches");
+    field2.setAccessible(true);
+    HashMap<String, Set<Watcher>> childWatches =
+        (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+    field2 = getField(watchManager.getClass(), "persistentWatches");
+    field2.setAccessible(true);
+    HashMap<String, Set<Watcher>> persistentWatches =
+        (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+    field2 = getField(watchManager.getClass(), "persistentRecursiveWatches");
+    field2.setAccessible(true);
+    HashMap<String, Set<Watcher>> persistentRecursiveWatches =
+        (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+
+    lists.put("dataWatches", new ArrayList<>(dataWatches.keySet()));
+    lists.put("existWatches", new ArrayList<>(existWatches.keySet()));
+    lists.put("childWatches", new ArrayList<>(childWatches.keySet()));
+    lists.put("persistentWatches", new 
ArrayList<>(persistentWatches.keySet()));
+    lists.put("persistentRecursiveWatches", new 
ArrayList<>(persistentRecursiveWatches.keySet()));
+
+    return lists;
+  }
+}
\ No newline at end of file
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 6eb624fb8..cbefd2156 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -46,6 +46,7 @@ import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static 
org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE;
 import static 
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
 import static 
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
 
@@ -323,7 +324,7 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
   @Test
   public void testDirectChildChangeListener() throws Exception {
     final String basePath = "/TestZkMetaClient_testDirectChildChangeListener";
-    final int count = 3;
+    final int count = 1000;
     try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
       zkMetaClient.connect();
       CountDownLatch countDownLatch = new CountDownLatch(count);
@@ -337,14 +338,70 @@ public class TestZkMetaClient extends 
ZkMetaClientTestBase{
       Assert.assertTrue(
           zkMetaClient.subscribeDirectChildChange(basePath, listener, false)
               .isRegistered());
-      zkMetaClient.create(basePath + "/child_1", "test-data");
-      //TODO: the native zkclient failed to provide persistent listener, and 
event might be lost.
-      // Remove Thread.sleep() below when the persistent watcher is supported
-      Thread.sleep(500);
-      zkMetaClient.create(basePath + "/child_2", "test-data");
-      Thread.sleep(500);
-      zkMetaClient.create(basePath + "/child_3", "test-data");
+      for(int i=0; i<1000; ++i){
+        zkMetaClient.create(basePath + "/child_" +i, "test-data");
+      }
+      // Verify no one time watcher is registered. Only one persist listener 
is registered.
+      Map<String, List<String>> watchers = 
TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 1);
+      Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+
+      zkMetaClient.unsubscribeDirectChildChange(basePath, listener);
+      // verify that no listener is registered on any path
+      watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+    }
+  }
+
+  @Test
+  public void testDataChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testDataChangeListener";
+    final int count = 200;
+    final int[] get_count = {0};
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      CountDownLatch countDownLatch = new CountDownLatch(count);
+      DataChangeListener listener = new DataChangeListener() {
+
+        @Override
+        public void handleDataChange(String key, Object data, ChangeType 
changeType)
+            throws Exception {
+          if(changeType == ENTRY_UPDATE) {
+            get_count[0]++;
+            countDownLatch.countDown();
+          }
+        }
+      };
+      zkMetaClient.create(basePath, "");
+      Assert.assertTrue(
+          zkMetaClient.subscribeDataChange(basePath, listener, false)
+      );
+      // Verify no one time watcher is registered. Only one persist listener 
is registered.
+      Map<String, List<String>> watchers = 
TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 1);
+      Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+      for (int i=0; i<200; ++i) {
+        zkMetaClient.set(basePath, "data7" + i, -1);
+      }
       Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+
+
+      zkMetaClient.unsubscribeDataChange(basePath, listener);
+      // verify that no listener is registered on any path
+      watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
     }
   }
 
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
index bd94432b4..43eefad26 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java
@@ -64,8 +64,7 @@ public interface IZkConnection {
     public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, 
InterruptedException;
 
     public void addAuthInfo(String scheme, byte[] auth);
-
-    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) 
throws KeeperException,
-                                                                               
      InterruptedException;
+    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
+        throws KeeperException, InterruptedException;
     public void removeWatches(String path, Watcher watcher, 
Watcher.WatcherType watcherType) throws InterruptedException, KeeperException;
 }
\ No newline at end of file
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index a8fb3ede2..41c23a6d1 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
 import javax.management.JMException;
 
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
@@ -59,6 +61,7 @@ import 
org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -103,7 +106,7 @@ public class ZkClient implements Watcher {
   private static AtomicLong UID = new AtomicLong(0);
   public final long _uid;
 
-  // ZNode write size limit in bytes.
+  // ZNode write size limit in bytes:
   // TODO: use ZKConfig#JUTE_MAXBUFFER once bumping up ZK to 3.5.2+
   private static final int WRITE_SIZE_LIMIT =
       Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, 
ZNRecord.SIZE_LIMIT);
@@ -133,8 +136,14 @@ public class ZkClient implements Watcher {
   private volatile boolean _closed;
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
+  // When _usePersistWatcher is true, ZKClient will register itself as persist 
watcher to Zk Server.
+  // No re-register is needed after change event.
+  // Default value is false, meaning ZKClient will register itself as regular 
ont time watcher to
+  // Zk Server and will re-register after an data or child change.
   private boolean _usePersistWatcher;
 
+  private final ReentrantLock _persistListenerMutex;
+
   // To automatically retry the async operation, we need a separate thread 
other than the
   // ZkEventThread. Otherwise the retry request might block the normal event 
processing.
   protected final ZkAsyncRetryThread _asyncCallRetryThread;
@@ -247,6 +256,7 @@ public class ZkClient implements Watcher {
       connect(connectionTimeout, this);
     }
     _usePersistWatcher = usePersistWatcher;
+    _persistListenerMutex = new ReentrantLock();
   }
 
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long 
operationRetryTimeout,
@@ -262,9 +272,13 @@ public class ZkClient implements Watcher {
   }
 
   public ChildrenSubscribeResult subscribeChildChanges(String path, 
IZkChildListener listener, boolean skipWatchingNonExistNode) {
+    if (_usePersistWatcher) {
+      addPersistListener(path, listener);
+    } else {
       synchronized (_childListener) {
         addChildListener(path, listener);
       }
+    }
 
     List<String> children = watchForChilds(path, skipWatchingNonExistNode);
     if (children == null && skipWatchingNonExistNode) {
@@ -277,15 +291,22 @@ public class ZkClient implements Watcher {
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener 
childListener) {
+    if (_usePersistWatcher) {
+      removePersistListener(path, childListener);
+    } else {
       synchronized (_childListener) {
         removeChildListener(path, childListener);
+      }
     }
   }
 
   public boolean subscribeDataChanges(String path, IZkDataListener listener, 
boolean skipWatchingNonExistNode) {
-
+    if (_usePersistWatcher) {
+      addPersistListener(path, listener);
+    } else {
       synchronized (_dataListener) {
         addDataListener(path, listener);
+      }
     }
 
     boolean watchInstalled = watchForData(path, skipWatchingNonExistNode);
@@ -336,9 +357,13 @@ public class ZkClient implements Watcher {
   }
 
   public void unsubscribeDataChanges(String path, IZkDataListener 
dataListener) {
+    if (_usePersistWatcher) {
+      removePersistListener(path, dataListener);
+    } else {
       synchronized (_dataListener) {
         removeDataListener(path, dataListener);
       }
+    }
   }
 
   public void subscribeStateChanges(final IZkStateListener listener) {
@@ -384,6 +409,20 @@ public class ZkClient implements Watcher {
   }
 
   public void unsubscribeAll() {
+    if (_usePersistWatcher) {
+      ManipulateListener removeAllListeners = () -> {
+        Stream.concat(_childListener.keySet().stream(), 
_dataListener.keySet().stream())
+            .forEach(p -> {
+              try {
+                getConnection().removeWatches(p, this, WatcherType.Any);
+              } catch (InterruptedException | KeeperException e) {
+                LOG.error("Failed to remove persistent watcher for {} ", p, e);
+              }
+            });
+      };
+      executeWithInPersistListenerMutex(removeAllListeners);
+      return;
+    }
     synchronized (_childListener) {
       _childListener.clear();
     }
@@ -1228,6 +1267,10 @@ public class ZkClient implements Watcher {
     if (LOG.isDebugEnabled()) {
       LOG.debug("zkclient {}, Received event: {} ", _uid, event);
     }
+
+    if (event.getType() == EventType.PersistentWatchRemoved) {
+      return;
+    }
     _zookeeperEventThread = Thread.currentThread();
 
     boolean stateChanged = event.getPath() == null;
@@ -1313,7 +1356,7 @@ public class ZkClient implements Watcher {
    * are deleted before the last page is fetched. The upstream caller should 
be able to handle this.
    */
   public List<String> getChildren(String path) {
-    return getChildren(path, hasListeners(path));
+    return getChildren(path, (!_usePersistWatcher) && hasListeners(path));
   }
 
   protected List<String> getChildren(final String path, final boolean watch) {
@@ -1768,7 +1811,8 @@ public class ZkClient implements Watcher {
               // the exists() useGetData (false) route to check stat. 
Otherwise, we use getData()
               // to install watch.
               Stat stat = null;
-              if (!pathExists) {
+              // no register one time watcher when _usePersistWatcher is true.
+              if (_usePersistWatcher || !pathExists) {
                 stat = getStat(path, false);
               } else {
                 stat = installWatchOnlyPathExist(path);
@@ -1785,7 +1829,7 @@ public class ZkClient implements Watcher {
                 }
                 try {
                   // TODO: the data is redundantly read multiple times when 
multiple listeners exist
-                  data = readData(path, null, true);
+                  data = readData(path, null, !_usePersistWatcher);
                 } catch (ZkNoNodeException e) {
                   LOG.warn("zkclient {} Prefetch data for path: {} failed.", 
_uid, path, e);
                   listener.getDataListener().handleDataDeleted(path);
@@ -1812,7 +1856,7 @@ public class ZkClient implements Watcher {
           public void run() throws Exception {
             if (!pathStatRecord.pathChecked()) {
               Stat stat = null;
-              if (!pathExists || !hasListeners(path)) {
+              if (_usePersistWatcher || !pathExists || !hasListeners(path)) {
                 // will not install listener using exists call
                 stat = getStat(path, false);
               } else {
@@ -2379,10 +2423,25 @@ public class ZkClient implements Watcher {
 
   private boolean watchForData(final String path, boolean 
skipWatchingNonExistNode) {
     try {
-      if (skipWatchingNonExistNode) {
-        retryUntilConnected(() -> (((ZkConnection) 
getConnection()).getZookeeper().getData(path, true, new Stat())));
+      if (_usePersistWatcher) {
+        return retryUntilConnected(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            if (!skipWatchingNonExistNode || exists(path)) {
+              getConnection().addWatch(path, ZkClient.this, 
AddWatchMode.PERSISTENT);
+              return true;
+            }
+            return false;
+          }
+        });
       } else {
-        retryUntilConnected(() -> (((ZkConnection) 
getConnection()).getZookeeper().exists(path, true)));
+        if (skipWatchingNonExistNode) {
+          retryUntilConnected(() -> (((ZkConnection) 
getConnection()).getZookeeper()
+              .getData(path, true, new Stat())));
+        } else {
+          retryUntilConnected(
+              () -> (((ZkConnection) 
getConnection()).getZookeeper().exists(path, true)));
+        }
       }
     } catch (ZkNoNodeException e) {
       // Do nothing, this is what we want as this is not going to leak watch 
in ZooKeeepr server.
@@ -2434,11 +2493,20 @@ public class ZkClient implements Watcher {
     return retryUntilConnected(new Callable<List<String>>() {
       @Override
       public List<String> call() throws Exception {
-        if (!skipWatchingNonExistNode) {
+        // We only register one time watcher without checking in path exists
+        // when _usePersistWatcher is false and skipWatchingNonExistNode is 
false.
+        if (!skipWatchingNonExistNode && !_usePersistWatcher) {
           exists(path, true);
         }
         try {
-          return getChildren(path, true);
+          if (_usePersistWatcher) {
+            if (!skipWatchingNonExistNode || exists(path)) {
+              getConnection().addWatch(path, ZkClient.this, 
AddWatchMode.PERSISTENT);
+            }
+            return getChildren(path, false);
+          } else {
+            return getChildren(path, true);
+          }
         } catch (ZkNoNodeException e) {
           // ignore, the "exists" watch will listen for the parent node to 
appear
           LOG.info("zkclient{} watchForChilds path not existing:{} 
skipWatchingNodeNoteExist: {}",
@@ -2927,4 +2995,56 @@ public class ZkClient implements Watcher {
       listeners.remove(listener);
     }
   }
+
+  interface ManipulateListener {
+    void run() throws KeeperException, InterruptedException;
+  }
+
+  private void addPersistListener(String path, Object listener) {
+    ManipulateListener addListeners = () -> {
+      if (listener instanceof IZkChildListener) {
+        addChildListener(path, (IZkChildListener) listener);
+      } else if (listener instanceof IZkDataListener) {
+        addDataListener(path, (IZkDataListener) listener);
+      }
+    };
+    executeWithInPersistListenerMutex(addListeners);
+  }
+
+
+  // TODO: Consider create an empty interface and let the two listeners 
interface extend that
+  // interface for code clean.
+  private void removePersistListener(String path, Object listener) {
+
+    ManipulateListener removeListeners = () -> {
+      try {
+        if (listener instanceof IZkChildListener) {
+          removeChildListener(path, (IZkChildListener) listener);
+        } else if (listener instanceof IZkDataListener) {
+          removeDataListener(path, (IZkDataListener) listener);
+        }
+        if (!hasListeners(path)) {
+          // TODO: update hasListeners logic when recursive persist listener 
is added
+          getConnection().removeWatches(path, this, WatcherType.Any);
+        }
+      } catch (KeeperException.NoWatcherException e) {
+        LOG.warn("Persist watcher is already removed");
+      }
+    };
+
+    executeWithInPersistListenerMutex(removeListeners);
+  }
+
+  private void executeWithInPersistListenerMutex(ManipulateListener runnable) {
+    try {
+      _persistListenerMutex.lockInterruptibly();
+      runnable.run();
+    } catch (KeeperException.NoWatcherException e) {
+      LOG.warn("Persist watcher is already removed");
+    } catch (KeeperException | InterruptedException ex) {
+      throw new ZkException(ex);
+    } finally {
+      _persistListenerMutex.unlock();
+    }
+  }
 }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 2f7ac27de..376409231 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -245,14 +245,6 @@ public class ZkConnection implements IZkConnection {
     _zk.addAuthInfo(scheme, auth);
   }
 
-  private void lookupGetChildrenMethod() {
-    _getChildrenMethod = doLookUpGetChildrenMethod();
-
-    LOG.info("Pagination config {}={}, method to be invoked: {}",
-        ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, 
GETCHILDREN_PAGINATION_DISABLED,
-        _getChildrenMethod.getName());
-  }
-
   @Override
   public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
       throws KeeperException, InterruptedException {
@@ -265,6 +257,14 @@ public class ZkConnection implements IZkConnection {
     _zk.removeWatches(path, watcher, watcherType, true);
   }
 
+  private void lookupGetChildrenMethod() {
+    _getChildrenMethod = doLookUpGetChildrenMethod();
+
+    LOG.info("Pagination config {}={}, method to be invoked: {}",
+        ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, 
GETCHILDREN_PAGINATION_DISABLED,
+        _getChildrenMethod.getName());
+  }
+
   private Method doLookUpGetChildrenMethod() {
     if (!GETCHILDREN_PAGINATION_DISABLED) {
       try {
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
new file mode 100644
index 000000000..1a13fc3ba
--- /dev/null
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
@@ -0,0 +1,111 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.impl.ZkTestHelper;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkClientPersistWatcher extends ZkTestBase {
+
+  @Test
+  void testZkClientDataChange() throws Exception {
+    ZkClient.Builder builder = new ZkClient.Builder();
+    builder.setZkServer(ZkTestBase.ZK_ADDR)
+        .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
+    ZkClient zkClient = builder.build();
+    zkClient.setZkSerializer(new BasicZkSerializer(new 
SerializableSerializer()));
+    int count = 1000;
+    final int[] event_count = {0};
+    CountDownLatch countDownLatch1 = new CountDownLatch(count);
+    String path = "/dataChangeTestPath";
+    IZkDataListener dataListener = new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws 
Exception {
+        countDownLatch1.countDown();
+        event_count[0]++ ;
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+      }
+    };
+
+    zkClient.subscribeDataChanges(path, dataListener);
+    zkClient.create(path, "datat", CreateMode.PERSISTENT);
+    for(int i=0; i<count; ++i) {
+      zkClient.writeData(path, ("datat"+i), -1);
+    }
+
+    Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
+    zkClient.close();
+  }
+
+  @Test(dependsOnMethods = "testZkClientDataChange")
+  void testZkClientChildChange() throws Exception {
+    ZkClient.Builder builder = new ZkClient.Builder();
+    builder.setZkServer(ZkTestBase.ZK_ADDR)
+        .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
+    ZkClient zkClient = builder.build();
+    zkClient.setZkSerializer(new BasicZkSerializer(new 
SerializableSerializer()));
+    int count = 100;
+    final int[] event_count = {0};
+    CountDownLatch countDownLatch1 = new CountDownLatch(count);
+    CountDownLatch countDownLatch2 = new CountDownLatch(count);
+    String path = "/testZkClientChildChange";
+    IZkChildListener childListener = new IZkChildListener() {
+      @Override
+      public void handleChildChange(String parentPath, List<String> 
currentChilds)
+          throws Exception {
+        countDownLatch1.countDown();
+        event_count[0]++ ;
+      }
+    };
+    IZkChildListener childListener2 = new IZkChildListener() {
+      @Override
+      public void handleChildChange(String parentPath, List<String> 
currentChilds)
+          throws Exception {
+        countDownLatch2.countDown();
+        event_count[0]++ ;
+      }
+    };
+    zkClient.subscribeChildChanges(path, childListener);
+    zkClient.subscribeChildChanges(path, childListener2);
+    zkClient.create(path, "datat", CreateMode.PERSISTENT);
+    for(int i=0; i<count; ++i) {
+      zkClient.create(path + "/child" +i , "datat", CreateMode.PERSISTENT);
+    }
+    Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
+    Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS));
+    zkClient.close();
+  }
+
+}
\ No newline at end of file

Reply via email to