This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 757155db3c5f9818816e0f09660d9c07376fab24 Author: Qi (Quincy) Qu <[email protected]> AuthorDate: Thu Feb 2 13:31:51 2023 -0500 Implement direct child change listener in ZkMetaClient Implements the direct child change listener in ZkMetaClient using the native ZkClient. --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 11 ++++- .../zk/adapter/DirectChildListenerAdapter.java | 55 ++++++++++++++++++++++ .../helix/metaclient/impl/zk/TestZkMetaClient.java | 38 +++++++++++++-- 3 files changed, 98 insertions(+), 6 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 2eb0da496..fbca7ce5c 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -35,8 +35,10 @@ import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter; +import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil; +import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.exception.ZkException; @@ -258,7 +260,12 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { public DirectChildSubscribeResult subscribeDirectChildChange(String key, DirectChildChangeListener listener, boolean skipWatchingNonExistNode, boolean persistListener) { - return null; + if (!persistListener) { + throw new NotImplementedException("Currently the non-persist (one-time) listener is not supported in ZkMetaClient."); + } + ChildrenSubscribeResult result = + _zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode); + return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled()); } @Override @@ -280,7 +287,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void unsubscribeDirectChildChange(String key, DirectChildChangeListener listener) { - + _zkClient.unsubscribeChildChanges(key, new DirectChildListenerAdapter(listener)); } @Override diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java new file mode 100644 index 000000000..b65bf5de1 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DirectChildListenerAdapter.java @@ -0,0 +1,55 @@ +package org.apache.helix.metaclient.impl.zk.adapter; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import org.apache.helix.metaclient.api.DirectChildChangeListener; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; + + +public class DirectChildListenerAdapter implements IZkChildListener { + private final DirectChildChangeListener _listener; + + public DirectChildListenerAdapter(DirectChildChangeListener listener) { + _listener = listener; + } + + @Override + public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception { + _listener.handleDirectChildChange(parentPath); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DirectChildListenerAdapter that = (DirectChildListenerAdapter) o; + return _listener.equals(that._listener); + } + + @Override + public int hashCode() { + return _listener.hashCode(); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index b4b7aedac..1edda4451 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -25,18 +25,20 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.helix.metaclient.api.DataUpdater; +import org.apache.helix.metaclient.api.DirectChildChangeListener; +import org.apache.helix.metaclient.api.MetaClientInterface; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.metaclient.api.DataChangeListener; -import org.apache.helix.metaclient.api.DataUpdater; -import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.exception.MetaClientException; @@ -301,6 +303,34 @@ public class TestZkMetaClient { } } + @Test + public void testDirectChildChangeListener() throws Exception { + final String basePath = "/TestZkMetaClient_testDirectChildChangeListener"; + final int count = 3; + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + CountDownLatch countDownLatch = new CountDownLatch(count); + DirectChildChangeListener listener = new DirectChildChangeListener() { + @Override + public void handleDirectChildChange(String key) throws Exception { + countDownLatch.countDown(); + } + }; + zkMetaClient.create(basePath, ""); + Assert.assertTrue( + zkMetaClient.subscribeDirectChildChange(basePath, listener, false, true) + .isRegistered()); + zkMetaClient.create(basePath + "/child_1", "test-data"); + //TODO: the native zkclient failed to provide persistent listener, and event might be lost. + // Remove Thread.sleep() below when the persistent watcher is supported + Thread.sleep(500); + zkMetaClient.create(basePath + "/child_2", "test-data"); + Thread.sleep(500); + zkMetaClient.create(basePath + "/child_3", "test-data"); + Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS)); + } + } + // TODO: Create a ZkMetadata test base class and move these helper to base class when more tests // are added. private static ZkMetaClient<String> createZkMetaClient() {
