This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6aa7c2cce8c6202e80df63ef11a1ee1af085b7dc Author: xyuanlu <[email protected]> AuthorDate: Thu Jul 13 10:25:10 2023 -0700 ZkClient - only register one time watcher for read data when not using persist listener. (#2555) --------- Co-authored-by: Xiaoyuan Lu <[email protected]> --- .../helix/metaclient/impl/zk/TestZkMetaClient.java | 22 +++++++ .../apache/helix/zookeeper/zkclient/ZkClient.java | 2 +- .../zkclient/TestZkClientPersistWatcher.java | 75 +++++++++++++--------- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 8aca150b0..19f21977b 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -504,6 +504,28 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{ } } + @Test + public void testChangeListener() throws Exception { + final String basePath = "/TestZkMetaClient_ChangeListener"; + final int count = 100; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + DataChangeListener listener = new DataChangeListener() { + + @Override + public void handleDataChange(String key, Object data, ChangeType changeType) + throws Exception { + } + }; + zkMetaClient.subscribeDataChange(basePath, listener, false); + zkMetaClient.create(basePath, ""); + zkMetaClient.get(basePath); + zkMetaClient.exists(basePath); + zkMetaClient.getDataAndStat(basePath); + zkMetaClient.getDirectChildrenKeys(basePath); + } + } + /** * Transactional op calls zk.multi() with a set of ops (operations) * and the return values are converted into metaclient opResults. diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 064f6b494..2a06158d0 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -2232,7 +2232,7 @@ public class ZkClient implements Watcher { @SuppressWarnings("unchecked") public <T extends Object> T readData(String path, Stat stat) { - return (T) readData(path, stat, hasChildOrDataListeners(path)); + return (T) readData(path, stat, (!_usePersistWatcher) && hasChildOrDataListeners(path)); } @SuppressWarnings("unchecked") diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java index 76f5352c9..c54bca1ef 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java @@ -41,8 +41,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase { void testZkClientDataChange() throws Exception { org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder = new org.apache.helix.zookeeper.impl.client.ZkClient.Builder(); - builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false) - .setUsePersistWatcher(true); + builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true); org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build(); zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); int count = 1000; @@ -63,8 +62,8 @@ public class TestZkClientPersistWatcher extends ZkTestBase { zkClient.subscribeDataChanges(path, dataListener); zkClient.create(path, "datat", CreateMode.PERSISTENT); - for(int i=0; i<count; ++i) { - zkClient.writeData(path, ("datat"+i), -1); + for (int i = 0; i < count; ++i) { + zkClient.writeData(path, ("datat" + i), -1); } Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS)); @@ -75,8 +74,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase { void testZkClientChildChange() throws Exception { org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder = new org.apache.helix.zookeeper.impl.client.ZkClient.Builder(); - builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false) - .setUsePersistWatcher(true); + builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true); org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build(); zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); int count = 100; @@ -86,16 +84,14 @@ public class TestZkClientPersistWatcher extends ZkTestBase { String path = "/testZkClientChildChange"; IZkChildListener childListener = new IZkChildListener() { @Override - public void handleChildChange(String parentPath, List<String> currentChilds) - throws Exception { + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { countDownLatch1.countDown(); - event_count[0]++ ; + event_count[0]++; } }; IZkChildListener childListener2 = new IZkChildListener() { @Override - public void handleChildChange(String parentPath, List<String> currentChilds) - throws Exception { + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { countDownLatch2.countDown(); event_count[0]++; } @@ -108,6 +104,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase { } Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS)); Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS)); + zkClient.deleteRecursively(path); zkClient.close(); } @@ -115,8 +112,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase { void testZkClientPersistRecursiveChange() throws Exception { org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder = new org.apache.helix.zookeeper.impl.client.ZkClient.Builder(); - builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false) - .setUsePersistWatcher(true); + builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true); org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build(); zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); int count = 100; @@ -124,39 +120,37 @@ public class TestZkClientPersistWatcher extends ZkTestBase { final AtomicInteger[] event_count2 = {new AtomicInteger(0)}; // for each iteration, we will edit a node, create a child, create a grand child, and // delete child. Expect 4 event per iteration. -> total event should be count*4 - CountDownLatch countDownLatch1 = new CountDownLatch(count*4); + CountDownLatch countDownLatch1 = new CountDownLatch(count * 4); CountDownLatch countDownLatch2 = new CountDownLatch(count); String path = "/testZkClientPersistRecursiveChange"; RecursivePersistListener rcListener = new RecursivePersistListener() { @Override - public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) - throws Exception { + public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception { countDownLatch1.countDown(); - event_count[0].incrementAndGet() ; + event_count[0].incrementAndGet(); } }; zkClient.create(path, "datat", CreateMode.PERSISTENT); zkClient.subscribePersistRecursiveListener(path, rcListener); - for (int i=0; i<count; ++i) { + for (int i = 0; i < count; ++i) { zkClient.writeData(path, "data7" + i, -1); - zkClient.create(path+"/c1_" +i , "datat", CreateMode.PERSISTENT); - zkClient.create(path+"/c1_" +i + "/c2", "datat", CreateMode.PERSISTENT); - zkClient.delete(path+"/c1_" +i + "/c2"); + zkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT); + zkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT); + zkClient.delete(path + "/c1_" + i + "/c2"); } Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS)); // subscribe a persist child watch, it should throw exception IZkChildListener childListener2 = new IZkChildListener() { @Override - public void handleChildChange(String parentPath, List<String> currentChilds) - throws Exception { + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { countDownLatch2.countDown(); event_count2[0].incrementAndGet(); } }; try { zkClient.subscribeChildChanges(path, childListener2, false); - } catch ( Exception ex) { + } catch (Exception ex) { Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException"); } @@ -164,14 +158,15 @@ public class TestZkClientPersistWatcher extends ZkTestBase { zkClient.unsubscribePersistRecursiveListener(path, rcListener); zkClient.subscribeChildChanges(path, childListener2, false); // we should only get 100 event since only 100 direct child change. - for (int i=0; i<count; ++i) { + for (int i = 0; i < count; ++i) { zkClient.writeData(path, "data7" + i, -1); - zkClient.create(path+"/c2_" +i , "datat", CreateMode.PERSISTENT); - zkClient.create(path+"/c2_" +i + "/c3", "datat", CreateMode.PERSISTENT); - zkClient.delete(path+"/c2_" +i + "/c3"); + zkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT); + zkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT); + zkClient.delete(path + "/c2_" + i + "/c3"); } Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS)); + zkClient.deleteRecursively(path); zkClient.close(); } @@ -179,8 +174,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase { void testSubscribeOneTimeChangeWhenUsingPersistWatcher() { org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder = new org.apache.helix.zookeeper.impl.client.ZkClient.Builder(); - builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false) - .setUsePersistWatcher(true); + builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true); ZkClient zkClient = builder.build(); zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); @@ -194,7 +188,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase { } try { - zkClient.readData(path, null, true); + zkClient.readData(path, null, true); Assert.fail("Should throw exception when subscribe one time listener"); } catch (Exception e) { Assert.assertEquals(e.getClass().getName(), "java.lang.IllegalArgumentException"); @@ -206,5 +200,24 @@ public class TestZkClientPersistWatcher extends ZkTestBase { } catch (Exception e) { Assert.assertEquals(e.getClass().getName(), "java.lang.IllegalArgumentException"); } + zkClient.delete(path); + zkClient.close(); + } + + @Test + void testCrudOperationWithResubscribe() { + org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder = + new org.apache.helix.zookeeper.impl.client.ZkClient.Builder(); + builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(false); + ZkClient zkClient = builder.build(); + zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer())); + + String path = "/testCrudOperationWithResubscribe"; + zkClient.create(path, "datat", CreateMode.PERSISTENT); + zkClient.exists(path, true); + zkClient.readData(path, null, true); + zkClient.getChildren(path, true); + zkClient.delete(path); + zkClient.close(); } } \ No newline at end of file
