This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit db68826154a996177487f8433d99e6d082049aba Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Thu Jan 19 17:15:37 2023 -0500 Implement data change listener for ZkMetaClient and test * Implement data change listener for ZkMetaClient and test --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 11 +- .../helix/metaclient/impl/zk/TestZkMetaClient.java | 119 ++++++++++++++++++++- .../apache/helix/zookeeper/zkclient/ZkClient.java | 2 +- 3 files changed, 128 insertions(+), 4 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 2fada4b28..00ad18f73 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -22,6 +22,7 @@ package org.apache.helix.metaclient.impl.zk; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.metaclient.api.AsyncCallback; import org.apache.helix.metaclient.api.ChildChangeListener; import org.apache.helix.metaclient.api.ConnectStateChangeListener; @@ -50,10 +51,12 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.server.EphemeralType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { - + private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class); private final ZkClient _zkClient; private final int _connectionTimeout; @@ -255,6 +258,10 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode, boolean persistListener) { + if (!persistListener) { + throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient."); + } + _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener)); return false; } @@ -279,7 +286,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void unsubscribeDataChange(String key, DataChangeListener listener) { - + _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener)); } @Override diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index d822c2fa8..553ce22e0 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -30,6 +30,14 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.metaclient.api.DataUpdater; import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.constants.MetaClientException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.helix.metaclient.api.DataChangeListener; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; import org.apache.helix.zookeeper.zkclient.ZkServer; @@ -44,8 +52,11 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS public class TestZkMetaClient { private static final String ZK_ADDR = "localhost:2183"; - private ZkServer _zkServer; + private static final int DEFAULT_TIMEOUT_MS = 1000; private static final String ENTRY_STRING_VALUE = "test-value"; + private final Object _syncObject = new Object(); + + private ZkServer _zkServer; @BeforeClass public void prepare() { @@ -196,7 +207,91 @@ public class TestZkMetaClient { } } + @Test + public void testDataChangeListenerTriggerWithZkWatcher() throws Exception { + final String path = "/TestZkMetaClient_testTriggerWithZkWatcher"; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + MockDataChangeListener listener = new MockDataChangeListener(); + zkMetaClient.subscribeDataChange(path, listener, false, true); + zkMetaClient.create(path, "test-node"); + int expectedCallCount = 0; + synchronized (_syncObject) { + while (listener.getTriggeredCount() == expectedCallCount) { + _syncObject.wait(DEFAULT_TIMEOUT_MS); + } + expectedCallCount++; + Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount); + Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_CREATED); + } + zkMetaClient.set(path, "test-node-changed", -1); + synchronized (_syncObject) { + while (listener.getTriggeredCount() == expectedCallCount) { + _syncObject.wait(DEFAULT_TIMEOUT_MS); + } + expectedCallCount++; + Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount); + Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_UPDATE); + } + zkMetaClient.delete(path); + synchronized (_syncObject) { + while (listener.getTriggeredCount() == expectedCallCount) { + _syncObject.wait(DEFAULT_TIMEOUT_MS); + } + expectedCallCount++; + Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount); + Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_DELETED); + } + // unregister listener, expect no more call + zkMetaClient.unsubscribeDataChange(path, listener); + zkMetaClient.create(path, "test-node"); + synchronized (_syncObject) { + _syncObject.wait(DEFAULT_TIMEOUT_MS); + Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount); + } + // register a new non-persistent listener + try { + zkMetaClient.subscribeDataChange(path, new MockDataChangeListener(), false, false); + Assert.fail("One-time listener is not supported, NotImplementedException should be thrown."); + } catch (NotImplementedException e) { + // expected + } + } + } + @Test(dependsOnMethods = "testDataChangeListenerTriggerWithZkWatcher") + public void testMultipleDataChangeListeners() throws Exception { + final String basePath = "/TestZkMetaClient_testMultipleDataChangeListeners"; + final int count = 5; + final String testData = "test-data"; + final AtomicBoolean dataExpected = new AtomicBoolean(true); + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + Map<String, Set<DataChangeListener>> listeners = new HashMap<>(); + CountDownLatch countDownLatch = new CountDownLatch(count); + zkMetaClient.create(basePath + "_1", testData); + // create paths + for (int i = 0; i < 2; i++) { + String path = basePath + "_" + i; + listeners.put(path, new HashSet<>()); + // 5 listeners for each path + for (int j = 0; j < count; j++) { + DataChangeListener listener = new DataChangeListener() { + @Override + public void handleDataChange(String key, Object data, ChangeType changeType) { + countDownLatch.countDown(); + dataExpected.set(dataExpected.get() && testData.equals(data)); + } + }; + listeners.get(path).add(listener); + zkMetaClient.subscribeDataChange(path, listener, false, true); + } + } + zkMetaClient.set(basePath + "_1", testData, -1); + Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS)); + Assert.assertTrue(dataExpected.get()); + } + } private static ZkMetaClient<String> createZkMetaClient() { ZkMetaClientConfig config = @@ -226,4 +321,26 @@ public class TestZkMetaClient { zkServer.start(); return zkServer; } + + private class MockDataChangeListener implements DataChangeListener { + private final AtomicInteger _triggeredCount = new AtomicInteger(0); + private volatile ChangeType _lastEventType; + + @Override + public void handleDataChange(String key, Object data, ChangeType changeType) { + _triggeredCount.getAndIncrement(); + _lastEventType = changeType; + synchronized (_syncObject) { + _syncObject.notifyAll(); + } + } + + int getTriggeredCount() { + return _triggeredCount.get(); + } + + ChangeType getLastEventType() { + return _lastEventType; + } + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 4fca90da9..747b7e7a0 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -2489,7 +2489,7 @@ public class ZkClient implements Watcher { }); } - protected void connect(final long maxMsToWaitUntilConnected) + public void connect(final long maxMsToWaitUntilConnected) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { connect(maxMsToWaitUntilConnected, this); }
