This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 125111264 Implement direct child change listener in ZkMetaClient
125111264 is described below
commit 125111264073a31846a61f71ea1c9721ea5a6ec2
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Thu Feb 2 13:31:51 2023 -0500
Implement direct child change listener in ZkMetaClient
Implements the direct child change listener in ZkMetaClient using the
native ZkClient.
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 11 ++++-
.../zk/adapter/DirectChildListenerAdapter.java | 55 ++++++++++++++++++++++
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 38 +++++++++++++--
3 files changed, 98 insertions(+), 6 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 2eb0da496..fbca7ce5c 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -35,8 +35,10 @@ import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -258,7 +260,12 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
public DirectChildSubscribeResult subscribeDirectChildChange(String key,
DirectChildChangeListener listener, boolean skipWatchingNonExistNode,
boolean persistListener) {
- return null;
+ if (!persistListener) {
+ throw new NotImplementedException("Currently the non-persist (one-time)
listener is not supported in ZkMetaClient.");
+ }
+ ChildrenSubscribeResult result =
+ _zkClient.subscribeChildChanges(key, new
DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
+ return new DirectChildSubscribeResult(result.getChildren(),
result.isInstalled());
}
@Override
@@ -280,7 +287,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public void unsubscribeDirectChildChange(String key,
DirectChildChangeListener listener) {
-
+ _zkClient.unsubscribeChildChanges(key, new
DirectChildListenerAdapter(listener));
}
@Override
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java
new file mode 100644
index 000000000..b65bf5de1
--- /dev/null
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java
@@ -0,0 +1,55 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+public class DirectChildListenerAdapter implements IZkChildListener {
+ private final DirectChildChangeListener _listener;
+
+ public DirectChildListenerAdapter(DirectChildChangeListener listener) {
+ _listener = listener;
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String>
currentChildren) throws Exception {
+ _listener.handleDirectChildChange(parentPath);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DirectChildListenerAdapter that = (DirectChildListenerAdapter) o;
+ return _listener.equals(that._listener);
+ }
+
+ @Override
+ public int hashCode() {
+ return _listener.hashCode();
+ }
+}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index b4b7aedac..1edda4451 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -25,18 +25,20 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.DataChangeListener;
-import org.apache.helix.metaclient.api.DataUpdater;
-import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
@@ -301,6 +303,34 @@ public class TestZkMetaClient {
}
}
+ @Test
+ public void testDirectChildChangeListener() throws Exception {
+ final String basePath = "/TestZkMetaClient_testDirectChildChangeListener";
+ final int count = 3;
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ CountDownLatch countDownLatch = new CountDownLatch(count);
+ DirectChildChangeListener listener = new DirectChildChangeListener() {
+ @Override
+ public void handleDirectChildChange(String key) throws Exception {
+ countDownLatch.countDown();
+ }
+ };
+ zkMetaClient.create(basePath, "");
+ Assert.assertTrue(
+ zkMetaClient.subscribeDirectChildChange(basePath, listener, false,
true)
+ .isRegistered());
+ zkMetaClient.create(basePath + "/child_1", "test-data");
+ //TODO: the native zkclient failed to provide persistent listener, and
event might be lost.
+ // Remove Thread.sleep() below when the persistent watcher is supported
+ Thread.sleep(500);
+ zkMetaClient.create(basePath + "/child_2", "test-data");
+ Thread.sleep(500);
+ zkMetaClient.create(basePath + "/child_3", "test-data");
+ Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+ }
+ }
+
// TODO: Create a ZkMetadata test base class and move these helper to base
class when more tests
// are added.
private static ZkMetaClient<String> createZkMetaClient() {