This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new d379f9fc8 Implement data change listener for ZkMetaClient and test
d379f9fc8 is described below
commit d379f9fc81370a6fa2b939efb1541910bc2fe553
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Thu Jan 19 17:15:37 2023 -0500
Implement data change listener for ZkMetaClient and test
* Implement data change listener for ZkMetaClient and test
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 11 +-
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 119 ++++++++++++++++++++-
.../apache/helix/zookeeper/zkclient/ZkClient.java | 2 +-
3 files changed, 128 insertions(+), 4 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 2fada4b28..00ad18f73 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.metaclient.impl.zk;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.AsyncCallback;
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
@@ -50,10 +51,12 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.EphemeralType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final int _connectionTimeout;
@@ -255,6 +258,10 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public boolean subscribeDataChange(String key, DataChangeListener listener,
boolean skipWatchingNonExistNode, boolean persistListener) {
+ if (!persistListener) {
+ throw new NotImplementedException("Currently the non-persist (one-time)
listener is not supported in ZkMetaClient.");
+ }
+ _zkClient.subscribeDataChanges(key, new DataListenerConverter(listener));
return false;
}
@@ -279,7 +286,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public void unsubscribeDataChange(String key, DataChangeListener listener) {
-
+ _zkClient.unsubscribeDataChanges(key, new DataListenerConverter(listener));
}
@Override
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index d822c2fa8..553ce22e0 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -30,6 +30,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.constants.MetaClientException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkServer;
@@ -44,8 +52,11 @@ import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS
public class TestZkMetaClient {
private static final String ZK_ADDR = "localhost:2183";
- private ZkServer _zkServer;
+ private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final String ENTRY_STRING_VALUE = "test-value";
+ private final Object _syncObject = new Object();
+
+ private ZkServer _zkServer;
@BeforeClass
public void prepare() {
@@ -196,7 +207,91 @@ public class TestZkMetaClient {
}
}
+ @Test
+ public void testDataChangeListenerTriggerWithZkWatcher() throws Exception {
+ final String path = "/TestZkMetaClient_testTriggerWithZkWatcher";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ MockDataChangeListener listener = new MockDataChangeListener();
+ zkMetaClient.subscribeDataChange(path, listener, false, true);
+ zkMetaClient.create(path, "test-node");
+ int expectedCallCount = 0;
+ synchronized (_syncObject) {
+ while (listener.getTriggeredCount() == expectedCallCount) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ }
+ expectedCallCount++;
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ Assert.assertEquals(listener.getLastEventType(),
DataChangeListener.ChangeType.ENTRY_CREATED);
+ }
+ zkMetaClient.set(path, "test-node-changed", -1);
+ synchronized (_syncObject) {
+ while (listener.getTriggeredCount() == expectedCallCount) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ }
+ expectedCallCount++;
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ Assert.assertEquals(listener.getLastEventType(),
DataChangeListener.ChangeType.ENTRY_UPDATE);
+ }
+ zkMetaClient.delete(path);
+ synchronized (_syncObject) {
+ while (listener.getTriggeredCount() == expectedCallCount) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ }
+ expectedCallCount++;
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ Assert.assertEquals(listener.getLastEventType(),
DataChangeListener.ChangeType.ENTRY_DELETED);
+ }
+ // unregister listener, expect no more call
+ zkMetaClient.unsubscribeDataChange(path, listener);
+ zkMetaClient.create(path, "test-node");
+ synchronized (_syncObject) {
+ _syncObject.wait(DEFAULT_TIMEOUT_MS);
+ Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
+ }
+ // register a new non-persistent listener
+ try {
+ zkMetaClient.subscribeDataChange(path, new MockDataChangeListener(),
false, false);
+ Assert.fail("One-time listener is not supported,
NotImplementedException should be thrown.");
+ } catch (NotImplementedException e) {
+ // expected
+ }
+ }
+ }
+ @Test(dependsOnMethods = "testDataChangeListenerTriggerWithZkWatcher")
+ public void testMultipleDataChangeListeners() throws Exception {
+ final String basePath =
"/TestZkMetaClient_testMultipleDataChangeListeners";
+ final int count = 5;
+ final String testData = "test-data";
+ final AtomicBoolean dataExpected = new AtomicBoolean(true);
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ Map<String, Set<DataChangeListener>> listeners = new HashMap<>();
+ CountDownLatch countDownLatch = new CountDownLatch(count);
+ zkMetaClient.create(basePath + "_1", testData);
+ // create paths
+ for (int i = 0; i < 2; i++) {
+ String path = basePath + "_" + i;
+ listeners.put(path, new HashSet<>());
+ // 5 listeners for each path
+ for (int j = 0; j < count; j++) {
+ DataChangeListener listener = new DataChangeListener() {
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType
changeType) {
+ countDownLatch.countDown();
+ dataExpected.set(dataExpected.get() && testData.equals(data));
+ }
+ };
+ listeners.get(path).add(listener);
+ zkMetaClient.subscribeDataChange(path, listener, false, true);
+ }
+ }
+ zkMetaClient.set(basePath + "_1", testData, -1);
+ Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(dataExpected.get());
+ }
+ }
private static ZkMetaClient<String> createZkMetaClient() {
ZkMetaClientConfig config =
@@ -226,4 +321,26 @@ public class TestZkMetaClient {
zkServer.start();
return zkServer;
}
+
+ private class MockDataChangeListener implements DataChangeListener {
+ private final AtomicInteger _triggeredCount = new AtomicInteger(0);
+ private volatile ChangeType _lastEventType;
+
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType
changeType) {
+ _triggeredCount.getAndIncrement();
+ _lastEventType = changeType;
+ synchronized (_syncObject) {
+ _syncObject.notifyAll();
+ }
+ }
+
+ int getTriggeredCount() {
+ return _triggeredCount.get();
+ }
+
+ ChangeType getLastEventType() {
+ return _lastEventType;
+ }
+ }
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 4fca90da9..747b7e7a0 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2489,7 +2489,7 @@ public class ZkClient implements Watcher {
});
}
- protected void connect(final long maxMsToWaitUntilConnected)
+ public void connect(final long maxMsToWaitUntilConnected)
throws ZkInterruptedException, ZkTimeoutException, IllegalStateException
{
connect(maxMsToWaitUntilConnected, this);
}