This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 19cece4f6 Add exception for register ZK one time watcher when
_usePersistWatcher flag is on (#2465)
19cece4f6 is described below
commit 19cece4f62476a21b427d3ede620100fdeabe446
Author: xyuanlu <[email protected]>
AuthorDate: Tue May 9 17:54:51 2023 -0700
Add exception for register ZK one time watcher when _usePersistWatcher flag
is on (#2465)
When Zk Client register a one time listener on a path hat has already a
persist watcher registered. ZK will over write the persist watcher and only
trigger event once when change happen. In Helix ZkClient, we should disable one
time wacher registration when _usePersistWatcher is set to true.
---
.../apache/helix/zookeeper/zkclient/ZkClient.java | 16 +++++
.../zookeeper/impl/TestZooKeeperConnection.java | 68 ++++++++++++++++++++++
.../TestZkClientPersistWatcher.java | 64 +++++++++++++++-----
3 files changed, 132 insertions(+), 16 deletions(-)
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 41c23a6d1..fb94e5436 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1360,6 +1360,7 @@ public class ZkClient implements Watcher {
}
protected List<String> getChildren(final String path, final boolean watch) {
+ validateNativeZkWatcherType(watch);
long startT = System.currentTimeMillis();
try {
@@ -1424,6 +1425,7 @@ public class ZkClient implements Watcher {
}
protected boolean exists(final String path, final boolean watch) {
+ validateNativeZkWatcherType(watch);
long startT = System.currentTimeMillis();
try {
boolean exists = retryUntilConnected(new Callable<Boolean>() {
@@ -1453,6 +1455,7 @@ public class ZkClient implements Watcher {
}
private Stat getStat(final String path, final boolean watch) {
+ validateNativeZkWatcherType(watch);
long startT = System.currentTimeMillis();
final Stat stat;
try {
@@ -2152,6 +2155,7 @@ public class ZkClient implements Watcher {
@SuppressWarnings("unchecked")
public <T extends Object> T readData(final String path, final Stat stat,
final boolean watch) {
+ validateNativeZkWatcherType(watch);
long startT = System.currentTimeMillis();
byte[] data = null;
try {
@@ -3047,4 +3051,16 @@ public class ZkClient implements Watcher {
_persistListenerMutex.unlock();
}
}
+
+ /*
+ Throws exception when try to subscribe watch when using
_usePersistWatcher. When ZkClient
+ is subscribed as persist watcher, resubscribing the same object as onw
time watcher will
+ over write the persist watcher causing missing following event.
+ */
+ private void validateNativeZkWatcherType(boolean watch) {
+ if (_usePersistWatcher && watch) {
+ throw new IllegalArgumentException(
+ "Can not subscribe one time watcher when ZkClient is using
PersistWatcher");
+ }
+ }
}
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestZooKeeperConnection.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestZooKeeperConnection.java
new file mode 100644
index 000000000..1ea51e2f9
--- /dev/null
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestZooKeeperConnection.java
@@ -0,0 +1,68 @@
+package org.apache.helix.zookeeper.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.zookeeper.zkclient.IZkConnection;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZooKeeperConnection extends ZkTestBase {
+ final int count = 100;
+ final int[] get_count = {0};
+ CountDownLatch countDownLatch = new CountDownLatch(count*2);
+
+
+ /*
+ This function tests persist watchers' behavior in {@link
org.apache.helix.zookeeper.zkclient.ZkConnection}
+ 1. Register a persist watcher on a path and create 100 children Znode, edit
the ZNode for 100 times.
+ Expecting 200 events.
+ 2. register a one time listener on the path. Make the same change and count
the total number of event.
+ */
+ @Test
+ void testPersistWatcher() throws Exception {
+ Watcher watcher1 = new PersistWatcher();
+ ZkClient zkClient = new
org.apache.helix.zookeeper.impl.client.ZkClient(ZK_ADDR);
+ IZkConnection _zk = zkClient.getConnection();
+ String path="/testPersistWatcher";
+ _zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ // register a persist listener on a path, change the ZNode 100 times,
create 100 child ZNode,
+ // and expecting 200 events
+ _zk.addWatch(path, watcher1, AddWatchMode.PERSISTENT);
+ for (int i=0; i<count; ++i) {
+ _zk.writeData(path, "datat".getBytes(), -1);
+ _zk.create(path+"/c1_" +i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+ Assert.assertTrue(countDownLatch.await(50000, TimeUnit.MILLISECONDS));
+
+ // register a one time listener on the path. Count the total number of
event.
+ // ZK will over write the persist watcher and only trigger event once for
child and data change.
+ _zk.readData(path, null, true);
+ _zk.getChildren(path, true);
+ for (int i=0; i<200; ++i) {
+ _zk.writeData(path, ("datat"+i).getBytes(), -1);
+ _zk.create(path+"/c2_" +i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+ Assert.assertTrue(TestHelper.verify(() -> {
+ return (get_count[0] == 202);
+ }, TestHelper.WAIT_DURATION));
+ System.out.println("testPersistWatcher received event count: " +
get_count[0]);
+ zkClient.close();
+ }
+
+ class PersistWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ get_count[0]++;
+ countDownLatch.countDown();
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
similarity index 61%
rename from
zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
rename to
zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
index 1a13fc3ba..3e09d1f43 100644
---
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
@@ -1,4 +1,4 @@
-package org.apache.helix.zookeeper.impl.client;
+package org.apache.helix.zookeeper.zkclient;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -24,9 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.impl.ZkTestBase;
-import org.apache.helix.zookeeper.impl.ZkTestHelper;
-import org.apache.helix.zookeeper.zkclient.IZkChildListener;
-import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
@@ -38,10 +36,11 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
@Test
void testZkClientDataChange() throws Exception {
- ZkClient.Builder builder = new ZkClient.Builder();
- builder.setZkServer(ZkTestBase.ZK_ADDR)
- .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
- ZkClient zkClient = builder.build();
+ org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+ new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
+ .setUsePersistWatcher(true);
+ org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new
SerializableSerializer()));
int count = 1000;
final int[] event_count = {0};
@@ -51,7 +50,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
@Override
public void handleDataChange(String dataPath, Object data) throws
Exception {
countDownLatch1.countDown();
- event_count[0]++ ;
+ event_count[0]++;
}
@Override
@@ -71,10 +70,11 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
@Test(dependsOnMethods = "testZkClientDataChange")
void testZkClientChildChange() throws Exception {
- ZkClient.Builder builder = new ZkClient.Builder();
- builder.setZkServer(ZkTestBase.ZK_ADDR)
- .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
- ZkClient zkClient = builder.build();
+ org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+ new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
+ .setUsePersistWatcher(true);
+ org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new
SerializableSerializer()));
int count = 100;
final int[] event_count = {0};
@@ -94,18 +94,50 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
public void handleChildChange(String parentPath, List<String>
currentChilds)
throws Exception {
countDownLatch2.countDown();
- event_count[0]++ ;
+ event_count[0]++;
}
};
zkClient.subscribeChildChanges(path, childListener);
zkClient.subscribeChildChanges(path, childListener2);
zkClient.create(path, "datat", CreateMode.PERSISTENT);
- for(int i=0; i<count; ++i) {
- zkClient.create(path + "/child" +i , "datat", CreateMode.PERSISTENT);
+ for (int i = 0; i < count; ++i) {
+ zkClient.create(path + "/child" + i, "datat", CreateMode.PERSISTENT);
}
Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS));
zkClient.close();
}
+ @Test
+ void testSubscribeOneTimeChangeWhenUsingPersistWatcher() {
+ org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+ new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
+ .setUsePersistWatcher(true);
+ ZkClient zkClient = builder.build();
+ zkClient.setZkSerializer(new BasicZkSerializer(new
SerializableSerializer()));
+
+ String path = "/testSubscribeOneTimeChangeWhenUsingPersistWatcher";
+ zkClient.create(path, "datat", CreateMode.PERSISTENT);
+ try {
+ zkClient.exists(path, true);
+ Assert.fail("Should throw exception when subscribe one time listener");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass().getName(),
"java.lang.IllegalArgumentException");
+ }
+
+ try {
+ zkClient.readData(path, null, true);
+ Assert.fail("Should throw exception when subscribe one time listener");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass().getName(),
"java.lang.IllegalArgumentException");
+ }
+
+ try {
+ zkClient.getChildren(path, true);
+ Assert.fail("Should throw exception when subscribe one time listener");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass().getName(),
"java.lang.IllegalArgumentException");
+ }
+ }
}
\ No newline at end of file