This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4819f6fbcb71a930092411731abd03b918fa2265 Author: xyuanlu <[email protected]> AuthorDate: Tue Apr 25 09:29:54 2023 -0700 Use persist watcher for listener registration in ZkClient (when configured) (#2432) Before ZK 3.6, all zk watchers are one time watcher. ZkClient used to resubscribe every time when an event happens. This change adopts ZK persist watcher. --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 2 +- .../apache/helix/metaclient/impl/zk/TestUtil.java | 96 ++++++++++++++ .../helix/metaclient/impl/zk/TestZkMetaClient.java | 73 +++++++++-- .../helix/zookeeper/zkclient/IZkConnection.java | 5 +- .../apache/helix/zookeeper/zkclient/ZkClient.java | 142 +++++++++++++++++++-- .../helix/zookeeper/zkclient/ZkConnection.java | 16 +-- .../impl/client/TestZkClientPersistWatcher.java | 111 ++++++++++++++++ 7 files changed, 414 insertions(+), 31 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 984766254..6888ee4ed 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -315,7 +315,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode) { _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener)); - return false; + return true; } @Override diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java new file mode 100644 index 000000000..1296a72f3 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java @@ -0,0 +1,96 @@ +package org.apache.helix.metaclient.impl.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.zkclient.ZkClient; +import org.apache.helix.zookeeper.zkclient.ZkConnection; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + + +public class TestUtil { + + static java.lang.reflect.Field getField(Class clazz, String fieldName) + throws NoSuchFieldException { + try { + return clazz.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + Class superClass = clazz.getSuperclass(); + if (superClass == null) { + throw e; + } + return getField(superClass, fieldName); + } + } + + public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client) + throws Exception { + Map<String, List<String>> lists = new HashMap<String, List<String>>(); + ZkClient zkClient = (ZkClient) client; + + ZkConnection connection = ((ZkConnection) zkClient.getConnection()); + ZooKeeper zk = connection.getZookeeper(); + + java.lang.reflect.Field field = getField(zk.getClass(), "watchManager"); + field.setAccessible(true); + Object watchManager = field.get(zk); + + java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches"); + field2.setAccessible(true); + HashMap<String, Set<Watcher>> dataWatches = + (HashMap<String, Set<Watcher>>) field2.get(watchManager); + + field2 = getField(watchManager.getClass(), "existWatches"); + field2.setAccessible(true); + HashMap<String, Set<Watcher>> existWatches = + (HashMap<String, Set<Watcher>>) field2.get(watchManager); + + field2 = getField(watchManager.getClass(), "childWatches"); + field2.setAccessible(true); + HashMap<String, Set<Watcher>> childWatches = + (HashMap<String, Set<Watcher>>) field2.get(watchManager); + + field2 = getField(watchManager.getClass(), "persistentWatches"); + field2.setAccessible(true); + HashMap<String, Set<Watcher>> persistentWatches = + (HashMap<String, Set<Watcher>>) field2.get(watchManager); + + field2 = getField(watchManager.getClass(), "persistentRecursiveWatches"); + field2.setAccessible(true); + HashMap<String, Set<Watcher>> persistentRecursiveWatches = + (HashMap<String, Set<Watcher>>) field2.get(watchManager); + + + lists.put("dataWatches", new ArrayList<>(dataWatches.keySet())); + lists.put("existWatches", new ArrayList<>(existWatches.keySet())); + lists.put("childWatches", new ArrayList<>(childWatches.keySet())); + lists.put("persistentWatches", new ArrayList<>(persistentWatches.keySet())); + lists.put("persistentRecursiveWatches", new ArrayList<>(persistentRecursiveWatches.keySet())); + + return lists; + } +} \ No newline at end of file diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 6eb624fb8..cbefd2156 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -46,6 +46,7 @@ import org.apache.zookeeper.KeeperException; import org.testng.Assert; import org.testng.annotations.Test; +import static org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE; import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER; import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT; @@ -323,7 +324,7 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{ @Test public void testDirectChildChangeListener() throws Exception { final String basePath = "/TestZkMetaClient_testDirectChildChangeListener"; - final int count = 3; + final int count = 1000; try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { zkMetaClient.connect(); CountDownLatch countDownLatch = new CountDownLatch(count); @@ -337,14 +338,70 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{ Assert.assertTrue( zkMetaClient.subscribeDirectChildChange(basePath, listener, false) .isRegistered()); - zkMetaClient.create(basePath + "/child_1", "test-data"); - //TODO: the native zkclient failed to provide persistent listener, and event might be lost. - // Remove Thread.sleep() below when the persistent watcher is supported - Thread.sleep(500); - zkMetaClient.create(basePath + "/child_2", "test-data"); - Thread.sleep(500); - zkMetaClient.create(basePath + "/child_3", "test-data"); + for(int i=0; i<1000; ++i){ + zkMetaClient.create(basePath + "/child_" +i, "test-data"); + } + // Verify no one time watcher is registered. Only one persist listener is registered. + Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient()); + Assert.assertEquals(watchers.get("persistentWatches").size(), 1); + Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath); + Assert.assertEquals(watchers.get("childWatches").size(), 0); + Assert.assertEquals(watchers.get("dataWatches").size(), 0); + Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS)); + + zkMetaClient.unsubscribeDirectChildChange(basePath, listener); + // verify that no listener is registered on any path + watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient()); + Assert.assertEquals(watchers.get("persistentWatches").size(), 0); + Assert.assertEquals(watchers.get("childWatches").size(), 0); + Assert.assertEquals(watchers.get("dataWatches").size(), 0); + } + } + + @Test + public void testDataChangeListener() throws Exception { + final String basePath = "/TestZkMetaClient_testDataChangeListener"; + final int count = 200; + final int[] get_count = {0}; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + CountDownLatch countDownLatch = new CountDownLatch(count); + DataChangeListener listener = new DataChangeListener() { + + @Override + public void handleDataChange(String key, Object data, ChangeType changeType) + throws Exception { + if(changeType == ENTRY_UPDATE) { + get_count[0]++; + countDownLatch.countDown(); + } + } + }; + zkMetaClient.create(basePath, ""); + Assert.assertTrue( + zkMetaClient.subscribeDataChange(basePath, listener, false) + ); + // Verify no one time watcher is registered. Only one persist listener is registered. + Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient()); + Assert.assertEquals(watchers.get("persistentWatches").size(), 1); + Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath); + Assert.assertEquals(watchers.get("childWatches").size(), 0); + Assert.assertEquals(watchers.get("dataWatches").size(), 0); + + for (int i=0; i<200; ++i) { + zkMetaClient.set(basePath, "data7" + i, -1); + } Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS)); + + + zkMetaClient.unsubscribeDataChange(basePath, listener); + // verify that no listener is registered on any path + watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient()); + watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient()); + Assert.assertEquals(watchers.get("persistentWatches").size(), 0); + Assert.assertEquals(watchers.get("childWatches").size(), 0); + Assert.assertEquals(watchers.get("dataWatches").size(), 0); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java index bd94432b4..43eefad26 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkConnection.java @@ -64,8 +64,7 @@ public interface IZkConnection { public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException; public void addAuthInfo(String scheme, byte[] auth); - - public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException, - InterruptedException; + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) + throws KeeperException, InterruptedException; public void removeWatches(String path, Watcher watcher, Watcher.WatcherType watcherType) throws InterruptedException, KeeperException; } \ No newline at end of file diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index a8fb3ede2..41c23a6d1 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; import javax.management.JMException; import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult; @@ -59,6 +61,7 @@ import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -103,7 +106,7 @@ public class ZkClient implements Watcher { private static AtomicLong UID = new AtomicLong(0); public final long _uid; - // ZNode write size limit in bytes. + // ZNode write size limit in bytes: // TODO: use ZKConfig#JUTE_MAXBUFFER once bumping up ZK to 3.5.2+ private static final int WRITE_SIZE_LIMIT = Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, ZNRecord.SIZE_LIMIT); @@ -133,8 +136,14 @@ public class ZkClient implements Watcher { private volatile boolean _closed; private PathBasedZkSerializer _pathBasedZkSerializer; private ZkClientMonitor _monitor; + // When _usePersistWatcher is true, ZKClient will register itself as persist watcher to Zk Server. + // No re-register is needed after change event. + // Default value is false, meaning ZKClient will register itself as regular ont time watcher to + // Zk Server and will re-register after an data or child change. private boolean _usePersistWatcher; + private final ReentrantLock _persistListenerMutex; + // To automatically retry the async operation, we need a separate thread other than the // ZkEventThread. Otherwise the retry request might block the normal event processing. protected final ZkAsyncRetryThread _asyncCallRetryThread; @@ -247,6 +256,7 @@ public class ZkClient implements Watcher { connect(connectionTimeout, this); } _usePersistWatcher = usePersistWatcher; + _persistListenerMutex = new ReentrantLock(); } protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, @@ -262,9 +272,13 @@ public class ZkClient implements Watcher { } public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode) { + if (_usePersistWatcher) { + addPersistListener(path, listener); + } else { synchronized (_childListener) { addChildListener(path, listener); } + } List<String> children = watchForChilds(path, skipWatchingNonExistNode); if (children == null && skipWatchingNonExistNode) { @@ -277,15 +291,22 @@ public class ZkClient implements Watcher { } public void unsubscribeChildChanges(String path, IZkChildListener childListener) { + if (_usePersistWatcher) { + removePersistListener(path, childListener); + } else { synchronized (_childListener) { removeChildListener(path, childListener); + } } } public boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode) { - + if (_usePersistWatcher) { + addPersistListener(path, listener); + } else { synchronized (_dataListener) { addDataListener(path, listener); + } } boolean watchInstalled = watchForData(path, skipWatchingNonExistNode); @@ -336,9 +357,13 @@ public class ZkClient implements Watcher { } public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { + if (_usePersistWatcher) { + removePersistListener(path, dataListener); + } else { synchronized (_dataListener) { removeDataListener(path, dataListener); } + } } public void subscribeStateChanges(final IZkStateListener listener) { @@ -384,6 +409,20 @@ public class ZkClient implements Watcher { } public void unsubscribeAll() { + if (_usePersistWatcher) { + ManipulateListener removeAllListeners = () -> { + Stream.concat(_childListener.keySet().stream(), _dataListener.keySet().stream()) + .forEach(p -> { + try { + getConnection().removeWatches(p, this, WatcherType.Any); + } catch (InterruptedException | KeeperException e) { + LOG.error("Failed to remove persistent watcher for {} ", p, e); + } + }); + }; + executeWithInPersistListenerMutex(removeAllListeners); + return; + } synchronized (_childListener) { _childListener.clear(); } @@ -1228,6 +1267,10 @@ public class ZkClient implements Watcher { if (LOG.isDebugEnabled()) { LOG.debug("zkclient {}, Received event: {} ", _uid, event); } + + if (event.getType() == EventType.PersistentWatchRemoved) { + return; + } _zookeeperEventThread = Thread.currentThread(); boolean stateChanged = event.getPath() == null; @@ -1313,7 +1356,7 @@ public class ZkClient implements Watcher { * are deleted before the last page is fetched. The upstream caller should be able to handle this. */ public List<String> getChildren(String path) { - return getChildren(path, hasListeners(path)); + return getChildren(path, (!_usePersistWatcher) && hasListeners(path)); } protected List<String> getChildren(final String path, final boolean watch) { @@ -1768,7 +1811,8 @@ public class ZkClient implements Watcher { // the exists() useGetData (false) route to check stat. Otherwise, we use getData() // to install watch. Stat stat = null; - if (!pathExists) { + // no register one time watcher when _usePersistWatcher is true. + if (_usePersistWatcher || !pathExists) { stat = getStat(path, false); } else { stat = installWatchOnlyPathExist(path); @@ -1785,7 +1829,7 @@ public class ZkClient implements Watcher { } try { // TODO: the data is redundantly read multiple times when multiple listeners exist - data = readData(path, null, true); + data = readData(path, null, !_usePersistWatcher); } catch (ZkNoNodeException e) { LOG.warn("zkclient {} Prefetch data for path: {} failed.", _uid, path, e); listener.getDataListener().handleDataDeleted(path); @@ -1812,7 +1856,7 @@ public class ZkClient implements Watcher { public void run() throws Exception { if (!pathStatRecord.pathChecked()) { Stat stat = null; - if (!pathExists || !hasListeners(path)) { + if (_usePersistWatcher || !pathExists || !hasListeners(path)) { // will not install listener using exists call stat = getStat(path, false); } else { @@ -2379,10 +2423,25 @@ public class ZkClient implements Watcher { private boolean watchForData(final String path, boolean skipWatchingNonExistNode) { try { - if (skipWatchingNonExistNode) { - retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper().getData(path, true, new Stat()))); + if (_usePersistWatcher) { + return retryUntilConnected(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + if (!skipWatchingNonExistNode || exists(path)) { + getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT); + return true; + } + return false; + } + }); } else { - retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper().exists(path, true))); + if (skipWatchingNonExistNode) { + retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper() + .getData(path, true, new Stat()))); + } else { + retryUntilConnected( + () -> (((ZkConnection) getConnection()).getZookeeper().exists(path, true))); + } } } catch (ZkNoNodeException e) { // Do nothing, this is what we want as this is not going to leak watch in ZooKeeepr server. @@ -2434,11 +2493,20 @@ public class ZkClient implements Watcher { return retryUntilConnected(new Callable<List<String>>() { @Override public List<String> call() throws Exception { - if (!skipWatchingNonExistNode) { + // We only register one time watcher without checking in path exists + // when _usePersistWatcher is false and skipWatchingNonExistNode is false. + if (!skipWatchingNonExistNode && !_usePersistWatcher) { exists(path, true); } try { - return getChildren(path, true); + if (_usePersistWatcher) { + if (!skipWatchingNonExistNode || exists(path)) { + getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT); + } + return getChildren(path, false); + } else { + return getChildren(path, true); + } } catch (ZkNoNodeException e) { // ignore, the "exists" watch will listen for the parent node to appear LOG.info("zkclient{} watchForChilds path not existing:{} skipWatchingNodeNoteExist: {}", @@ -2927,4 +2995,56 @@ public class ZkClient implements Watcher { listeners.remove(listener); } } + + interface ManipulateListener { + void run() throws KeeperException, InterruptedException; + } + + private void addPersistListener(String path, Object listener) { + ManipulateListener addListeners = () -> { + if (listener instanceof IZkChildListener) { + addChildListener(path, (IZkChildListener) listener); + } else if (listener instanceof IZkDataListener) { + addDataListener(path, (IZkDataListener) listener); + } + }; + executeWithInPersistListenerMutex(addListeners); + } + + + // TODO: Consider create an empty interface and let the two listeners interface extend that + // interface for code clean. + private void removePersistListener(String path, Object listener) { + + ManipulateListener removeListeners = () -> { + try { + if (listener instanceof IZkChildListener) { + removeChildListener(path, (IZkChildListener) listener); + } else if (listener instanceof IZkDataListener) { + removeDataListener(path, (IZkDataListener) listener); + } + if (!hasListeners(path)) { + // TODO: update hasListeners logic when recursive persist listener is added + getConnection().removeWatches(path, this, WatcherType.Any); + } + } catch (KeeperException.NoWatcherException e) { + LOG.warn("Persist watcher is already removed"); + } + }; + + executeWithInPersistListenerMutex(removeListeners); + } + + private void executeWithInPersistListenerMutex(ManipulateListener runnable) { + try { + _persistListenerMutex.lockInterruptibly(); + runnable.run(); + } catch (KeeperException.NoWatcherException e) { + LOG.warn("Persist watcher is already removed"); + } catch (KeeperException | InterruptedException ex) { + throw new ZkException(ex); + } finally { + _persistListenerMutex.unlock(); + } + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java index 2f7ac27de..376409231 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java @@ -245,14 +245,6 @@ public class ZkConnection implements IZkConnection { _zk.addAuthInfo(scheme, auth); } - private void lookupGetChildrenMethod() { - _getChildrenMethod = doLookUpGetChildrenMethod(); - - LOG.info("Pagination config {}={}, method to be invoked: {}", - ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, GETCHILDREN_PAGINATION_DISABLED, - _getChildrenMethod.getName()); - } - @Override public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException, InterruptedException { @@ -265,6 +257,14 @@ public class ZkConnection implements IZkConnection { _zk.removeWatches(path, watcher, watcherType, true); } + private void lookupGetChildrenMethod() { + _getChildrenMethod = doLookUpGetChildrenMethod(); + + LOG.info("Pagination config {}={}, method to be invoked: {}", + ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, GETCHILDREN_PAGINATION_DISABLED, + _getChildrenMethod.getName()); + } + private Method doLookUpGetChildrenMethod() { if (!GETCHILDREN_PAGINATION_DISABLED) { try { diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java new file mode 100644 index 000000000..1a13fc3ba --- /dev/null +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java @@ -0,0 +1,111 @@ +package org.apache.helix.zookeeper.impl.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.zookeeper.impl.ZkTestBase; +import org.apache.helix.zookeeper.impl.ZkTestHelper; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; +import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer; +import org.apache.zookeeper.CreateMode; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestZkClientPersistWatcher extends ZkTestBase { + + @Test + void testZkClientDataChange() throws Exception { + ZkClient.Builder builder = new ZkClient.Builder(); + builder.setZkServer(ZkTestBase.ZK_ADDR) + .setMonitorRootPathOnly(false).setUsePersistWatcher(true); + ZkClient zkClient = builder.build(); + zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); + int count = 1000; + final int[] event_count = {0}; + CountDownLatch countDownLatch1 = new CountDownLatch(count); + String path = "/dataChangeTestPath"; + IZkDataListener dataListener = new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + countDownLatch1.countDown(); + event_count[0]++ ; + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + } + }; + + zkClient.subscribeDataChanges(path, dataListener); + zkClient.create(path, "datat", CreateMode.PERSISTENT); + for(int i=0; i<count; ++i) { + zkClient.writeData(path, ("datat"+i), -1); + } + + Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS)); + zkClient.close(); + } + + @Test(dependsOnMethods = "testZkClientDataChange") + void testZkClientChildChange() throws Exception { + ZkClient.Builder builder = new ZkClient.Builder(); + builder.setZkServer(ZkTestBase.ZK_ADDR) + .setMonitorRootPathOnly(false).setUsePersistWatcher(true); + ZkClient zkClient = builder.build(); + zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); + int count = 100; + final int[] event_count = {0}; + CountDownLatch countDownLatch1 = new CountDownLatch(count); + CountDownLatch countDownLatch2 = new CountDownLatch(count); + String path = "/testZkClientChildChange"; + IZkChildListener childListener = new IZkChildListener() { + @Override + public void handleChildChange(String parentPath, List<String> currentChilds) + throws Exception { + countDownLatch1.countDown(); + event_count[0]++ ; + } + }; + IZkChildListener childListener2 = new IZkChildListener() { + @Override + public void handleChildChange(String parentPath, List<String> currentChilds) + throws Exception { + countDownLatch2.countDown(); + event_count[0]++ ; + } + }; + zkClient.subscribeChildChanges(path, childListener); + zkClient.subscribeChildChanges(path, childListener2); + zkClient.create(path, "datat", CreateMode.PERSISTENT); + for(int i=0; i<count; ++i) { + zkClient.create(path + "/child" +i , "datat", CreateMode.PERSISTENT); + } + Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS)); + Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS)); + zkClient.close(); + } + +} \ No newline at end of file
