This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6bd97a98c47e7caee76d3a704ec55aa4732413b5 Author: Grant Paláu Spencer <[email protected]> AuthorDate: Wed Oct 4 13:26:43 2023 -0700 Add recursiveCreate functionality to metaclient (#2607) Co-authored-by: Grant Palau Spencer <[email protected]> --- .../helix/metaclient/api/MetaClientInterface.java | 15 +++++ .../helix/metaclient/impl/zk/ZkMetaClient.java | 65 +++++++++++++++++++++- .../metaclient/impl/zk/util/ZkMetaClientUtil.java | 25 +++++++++ .../metaclient/impl/zk/TestStressZkClient.java | 30 ++++++++++ .../helix/metaclient/impl/zk/TestZkMetaClient.java | 46 ++++++++++++++- 5 files changed, 179 insertions(+), 2 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java index 5b26896a9..5403e7ca4 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java @@ -152,6 +152,21 @@ public interface MetaClientInterface<T> { */ void create(final String key, final T data, final EntryMode mode); + /** + * Create an entry of given EntryMode with given key and data. If any parent node in the node + * hierarchy does not exist, then the parent node will attempt to be created. The entry will not + * be created if there is an existing entry with the same full key. Ephemeral nodes cannot have + * children, so only the final child in the created path will be ephemeral. + */ + void recursiveCreate(final String key, final T Data, final EntryMode mode); + + /** + * Create a TTL entry with given key, data, and expiry time (ttl). If any parent node in the node + * hierarchy does not exist, then the parent node will attempt to be created. The entry will not be created if + * there is an existing entry with the same full key. + */ + void recursiveCreateWithTTL(String key, T data, long ttl); + /** * Create an entry of given EntryMode with given key, data, and expiry time (ttl). * The entry will automatically purge when reached expiry time and has no children. diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 8753747f3..16d28c6d7 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -41,6 +41,7 @@ import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.exception.MetaClientNoNodeException; +import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter; import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter; import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter; @@ -63,6 +64,7 @@ import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.separateIntoUniqueNodePaths; import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException; @@ -104,7 +106,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { } @Override - public void create(String key, Object data, MetaClientInterface.EntryMode mode) { + public void create(String key, Object data, EntryMode mode) { try { _zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode)); @@ -115,6 +117,67 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { } } + @Override + public void recursiveCreate(String key, T data, EntryMode mode) { + // Function named recursiveCreate to match naming scheme, but actual work is iterative + iterativeCreate(key, data, mode, -1); + } + + @Override + public void recursiveCreateWithTTL(String key, T data, long ttl) { + iterativeCreate(key, data, EntryMode.TTL, ttl); + } + + private void iterativeCreate(String key, T data, EntryMode mode, long ttl) { + List<String> nodePaths = separateIntoUniqueNodePaths(key); + int i = 0; + // Ephemeral nodes cant have children, so change mode when creating parents + EntryMode parentMode = (EntryMode.EPHEMERAL.equals(mode) ? + EntryMode.PERSISTENT : mode); + + // Iterate over paths, starting with full key then attempting each successive parent + // Try /a/b/c, if parent /a/b, does not exist, then try to create parent, etc.. + while (i < nodePaths.size()) { + // If parent exists or there is no parent node, then try to create the node + // and break out of loop on successful create + if (i == nodePaths.size() - 1 || _zkClient.exists(nodePaths.get(i+1))) { + try { + if (EntryMode.TTL.equals(mode)) { + createWithTTL(nodePaths.get(i), data, ttl); + } else { + create(nodePaths.get(i), data, i == 0 ? mode : parentMode); + } + // Race condition may occur where a node is created by another thread in between loops. + // We should not throw error if this occurs for parent nodes, only for the full node path. + } catch (MetaClientNodeExistsException e) { + if (i == 0) { + throw e; + } + } + break; + // Else try to create parent in next loop iteration + } else { + i++; + } + } + + // Reattempt creation of children that failed due to parent not existing + while (--i >= 0) { + try { + if (EntryMode.TTL.equals(mode)) { + createWithTTL(nodePaths.get(i), data, ttl); + } else { + create(nodePaths.get(i), data, i == 0 ? mode : parentMode); + } + // Catch same race condition as above + } catch (MetaClientNodeExistsException e) { + if (i == 0) { + throw e; + } + } + } + } + @Override public void createWithTTL(String key, T data, long ttl) { try { diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java index f93e98919..aee0c698a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java @@ -20,6 +20,7 @@ package org.apache.helix.metaclient.impl.zk.util; */ import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; @@ -358,4 +359,28 @@ public class ZkMetaClientUtil { return MetaClientException.ReturnCode.DB_USER_ERROR; } } + + // Returns null if no parent path + public static String getZkParentPath(String path) { + int idx = path.lastIndexOf('/'); + return idx == 0 ? null : path.substring(0, idx); + } + + // Splits a path into the paths for each node along the way. + // /a/b/c --> /a/b/c, /a/b, /a + public static List<String> separateIntoUniqueNodePaths(String path) { + if (path == null || "/".equals(path)) { + return null; + } + + String[] subPath = path.split("/"); + String[] nodePaths = new String[subPath.length-1]; + StringBuilder tempPath = new StringBuilder(); + for (int i = 1; i < subPath.length; i++) { + tempPath.append( "/"); + tempPath.append(subPath[i]); + nodePaths[subPath.length - 1 - i] = tempPath.toString(); + } + return Arrays.asList(nodePaths); + } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java index 6f358f0e8..284ce3645 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.metaclient.api.*; import org.apache.helix.metaclient.datamodel.DataRecord; import org.apache.helix.metaclient.exception.MetaClientException; +import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.recipes.lock.DataRecordSerializer; import org.testng.Assert; @@ -129,6 +130,35 @@ public class TestStressZkClient extends ZkMetaClientTestBase { Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); } + @Test + public void testRecursiveCreate() { + final String zkParentKey = "/stressZk_testRecursiveCreate"; + _zkMetaClient.create(zkParentKey, ENTRY_STRING_VALUE); + + int count = (int) Math.pow(TEST_ITERATION_COUNT, 1/3d); + for (int i = 0; i < count; i++) { + + for (int j = 0; j < count; j++) { + + for (int k = 0; k < count; k++) { + String key = zkParentKey + "/" + i + "/" + j + "/" + k; + _zkMetaClient.recursiveCreate(key, String.valueOf(k), PERSISTENT); + Assert.assertEquals(String.valueOf(k), _zkMetaClient.get(key)); + } + } + try { + _zkMetaClient.recursiveCreate(zkParentKey + "/" + i, "should_fail", PERSISTENT); + Assert.fail("Should have failed due to node existing"); + } catch (MetaClientNodeExistsException ignoredException) { + } + } + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + + } + @Test public void testGet() { final String zkParentKey = "/stressZk_testGet"; diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java index 86edbf8eb..a5da69f2f 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java @@ -26,6 +26,8 @@ import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.api.DirectChildChangeListener; +import java.io.StringWriter; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -38,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.NotImplementedException; -import org.apache.helix.metaclient.api.ConnectStateChangeListener; import org.apache.helix.metaclient.api.DataChangeListener; import org.apache.helix.metaclient.api.Op; import org.apache.helix.metaclient.api.OpResult; @@ -50,6 +51,7 @@ import org.testng.annotations.Test; import static org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE; import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER; +import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.EPHEMERAL; import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT; @@ -97,6 +99,48 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{ } } + @Test + public void testRecursiveCreate() { + final String path = "/Test/ZkMetaClient/_fullPath"; + + + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + MetaClientInterface.EntryMode mode = EPHEMERAL; + + // Should succeed even if one of the parent nodes exists + String extendedPath = "/A" + path; + zkMetaClient.create("/A", ENTRY_STRING_VALUE, PERSISTENT); + zkMetaClient.recursiveCreate(extendedPath, ENTRY_STRING_VALUE, mode); + Assert.assertNotNull(zkMetaClient.exists(extendedPath)); + + // Should succeed if no parent nodes exist + zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode); + Assert.assertNotNull(zkMetaClient.exists(path)); + Assert.assertEquals(zkMetaClient.getDataAndStat("/Test").getRight().getEntryType(), PERSISTENT); + Assert.assertEquals(zkMetaClient.getDataAndStat(path).getRight().getEntryType(), mode); + + // Should throw NodeExistsException if child node exists + zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode); + Assert.fail("Should have failed due to node already created"); + } catch (MetaClientException e) { + Assert.assertEquals(e.getMessage(), "org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /Test/ZkMetaClient/_fullPath"); + System.out.println(e.getMessage()); + } + + } + + @Test + public void testRecursiveCreateWithTTL() { + final String path = "/Test/ZkMetaClient/_fullPath/withTTL"; + + try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) { + zkMetaClient.connect(); + zkMetaClient.recursiveCreateWithTTL(path, ENTRY_STRING_VALUE, 1000); + Assert.assertNotNull(zkMetaClient.exists(path)); + } + } + @Test public void testRenewTTL() { final String key = "/TestZkMetaClient_testRenewTTL_1";
