This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 757155db3c5f9818816e0f09660d9c07376fab24
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Thu Feb 2 13:31:51 2023 -0500

    Implement direct child change listener in ZkMetaClient
    
    Implements the direct child change listener in ZkMetaClient using the 
native ZkClient.
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     | 11 ++++-
 .../zk/adapter/DirectChildListenerAdapter.java     | 55 ++++++++++++++++++++++
 .../helix/metaclient/impl/zk/TestZkMetaClient.java | 38 +++++++++++++--
 3 files changed, 98 insertions(+), 6 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 2eb0da496..fbca7ce5c 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -35,8 +35,10 @@ import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -258,7 +260,12 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
   public DirectChildSubscribeResult subscribeDirectChildChange(String key,
       DirectChildChangeListener listener, boolean skipWatchingNonExistNode,
       boolean persistListener) {
-    return null;
+    if (!persistListener) {
+      throw new NotImplementedException("Currently the non-persist (one-time) 
listener is not supported in ZkMetaClient.");
+    }
+    ChildrenSubscribeResult result =
+        _zkClient.subscribeChildChanges(key, new 
DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
+    return new DirectChildSubscribeResult(result.getChildren(), 
result.isInstalled());
   }
 
   @Override
@@ -280,7 +287,7 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void unsubscribeDirectChildChange(String key, 
DirectChildChangeListener listener) {
-
+    _zkClient.unsubscribeChildChanges(key, new 
DirectChildListenerAdapter(listener));
   }
 
   @Override
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java
new file mode 100644
index 000000000..b65bf5de1
--- /dev/null
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java
@@ -0,0 +1,55 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+public class DirectChildListenerAdapter implements IZkChildListener {
+  private final DirectChildChangeListener _listener;
+
+  public DirectChildListenerAdapter(DirectChildChangeListener listener) {
+    _listener = listener;
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> 
currentChildren) throws Exception {
+    _listener.handleDirectChildChange(parentPath);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DirectChildListenerAdapter that = (DirectChildListenerAdapter) o;
+    return _listener.equals(that._listener);
+  }
+
+  @Override
+  public int hashCode() {
+    return _listener.hashCode();
+  }
+}
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index b4b7aedac..1edda4451 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -25,18 +25,20 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.DataChangeListener;
-import org.apache.helix.metaclient.api.DataUpdater;
-import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.exception.MetaClientException;
@@ -301,6 +303,34 @@ public class TestZkMetaClient {
     }
   }
 
+  @Test
+  public void testDirectChildChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testDirectChildChangeListener";
+    final int count = 3;
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      CountDownLatch countDownLatch = new CountDownLatch(count);
+      DirectChildChangeListener listener = new DirectChildChangeListener() {
+        @Override
+        public void handleDirectChildChange(String key) throws Exception {
+          countDownLatch.countDown();
+        }
+      };
+      zkMetaClient.create(basePath, "");
+      Assert.assertTrue(
+          zkMetaClient.subscribeDirectChildChange(basePath, listener, false, 
true)
+              .isRegistered());
+      zkMetaClient.create(basePath + "/child_1", "test-data");
+      //TODO: the native zkclient failed to provide persistent listener, and 
event might be lost.
+      // Remove Thread.sleep() below when the persistent watcher is supported
+      Thread.sleep(500);
+      zkMetaClient.create(basePath + "/child_2", "test-data");
+      Thread.sleep(500);
+      zkMetaClient.create(basePath + "/child_3", "test-data");
+      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+    }
+  }
+
   // TODO: Create a ZkMetadata test base class and move these helper to base 
class when more tests
   // are added.
   private static ZkMetaClient<String> createZkMetaClient() {

Reply via email to