This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new a3a3e2b6f Lattice Children Cache Implementation(#2623)
a3a3e2b6f is described below
commit a3a3e2b6ff0192d990436836436b8cb3b272813d
Author: Marcos Rico Peng <[email protected]>
AuthorDate: Thu Oct 5 17:23:49 2023 -0700
Lattice Children Cache Implementation(#2623)
Co-authored-by: mapeng <[email protected]>
---
.../metaclient/api/MetaClientCacheInterface.java | 35 +++++-
.../factories/MetaClientCacheConfig.java | 7 +-
.../metaclient/impl/zk/ZkMetaClientCache.java | 53 +++++++--
.../metaclient/impl/zk/TestZkMetaClientCache.java | 124 ++++++++++++++++-----
4 files changed, 174 insertions(+), 45 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
index 6449970bb..3630e1c39 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
@@ -30,10 +30,8 @@ public interface MetaClientCacheInterface<T> extends
MetaClientInterface<T> {
class TrieNode {
// A mapping between trie key and children nodes.
private Map<String, TrieNode> _children;
-
// the complete path/prefix leading to the current node.
private final String _path;
-
private final String _nodeKey;
public TrieNode(String path, String nodeKey) {
@@ -54,8 +52,39 @@ public interface MetaClientCacheInterface<T> extends
MetaClientInterface<T> {
return _nodeKey;
}
- public void addChild(String key, TrieNode node) {
+ public void addChild(String key, TrieNode node) {
_children.put(key, node);
}
+
+ public TrieNode processPath(String path, boolean isCreate) {
+ String[] pathComponents = path.split("/");
+ TrieNode currentNode = this;
+ TrieNode previousNode = null;
+
+ for (int i = 1; i < pathComponents.length; i++) {
+ String component = pathComponents[i];
+ if (component.equals(_nodeKey)) {
+ // Skip the root node
+ } else if (!currentNode.getChildren().containsKey(component)) {
+ if (isCreate) {
+ TrieNode newNode = new TrieNode(currentNode.getPath()
+ "/" + component, component);
+ currentNode.addChild(component, newNode);
+ previousNode = currentNode;
+ currentNode = newNode;
+ } else {
+ return currentNode;
+ }
+ } else {
+ previousNode = currentNode;
+ currentNode = currentNode.getChildren().get(component);
+ }
+ }
+
+ if (!isCreate && previousNode != null) {
+ previousNode.getChildren().remove(currentNode.getNodeKey());
+ }
+
+ return currentNode;
+ }
}
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
index 07972945a..2c23a0a68 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
@@ -23,10 +23,10 @@ package org.apache.helix.metaclient.factories;
public class MetaClientCacheConfig {
private final String _rootEntry;
- private boolean _cacheData = false;
- private boolean _cacheChildren = false;
+ private final boolean _cacheData;
+ private final boolean _cacheChildren;
- public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean
cacheChildren, boolean lazyCaching) {
+ public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean
cacheChildren) {
_rootEntry = rootEntry;
_cacheData = cacheData;
_cacheChildren = cacheChildren;
@@ -43,5 +43,4 @@ public class MetaClientCacheConfig {
public boolean getCacheChildren() {
return _cacheChildren;
}
-
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
index 66eb6f91e..45701063c 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
@@ -70,7 +70,7 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T>
implements MetaClientC
_dataCacheMap = new ConcurrentHashMap<>();
}
if (_cacheChildren) {
- _childrenCacheTree = new TrieNode(_rootEntry, null);
+ _childrenCacheTree = new TrieNode(_rootEntry,
_rootEntry.substring(1));
}
}
@@ -102,14 +102,46 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T>
implements MetaClientC
return dataList;
}
+ /**
+ * Get the direct children for a given key.
+ * @param key For metadata storage that has hierarchical key space (e.g.
ZK), the key would be
+ * a parent key,
+ * For metadata storage that has non-hierarchical key space
(e.g. etcd), the key would
+ * be a prefix key.
+ * @return list of direct children or null if key doesn't exist / cache is
not populated yet.
+ */
@Override
public List<String> getDirectChildrenKeys(final String key) {
- throw new MetaClientException("Not implemented yet.");
+ if (_cacheChildren) {
+ TrieNode node = _childrenCacheTree.processPath(key, true);
+ if (node == null) {
+ LOG.debug("Children not found in cache for key: {}. This could
be because the cache is still being populated.", key);
+ return null;
+ }
+ return List.copyOf(node.getChildren().keySet());
+ }
+ return super.getDirectChildrenKeys(key);
}
+ /**
+ * Get the number of direct children for a given key.
+ * @param key For metadata storage that has hierarchical key space (e.g.
ZK), the key would be
+ * a parent key,
+ * For metadata storage that has non-hierarchical key space
(e.g. etcd), the key would
+ * be a prefix key.
+ * @return number of direct children or 0 if key doesn't exist / has no
children / cache is not populated yet.
+ */
@Override
public int countDirectChildren(final String key) {
- throw new MetaClientException("Not implemented yet.");
+ if (_cacheChildren) {
+ TrieNode node = _childrenCacheTree.processPath(key, true);
+ if (node == null) {
+ LOG.debug("Children not found in cache for key: {}. This could
be because the cache is still being populated.", key);
+ return 0;
+ }
+ return node.getChildren().size();
+ }
+ return super.countDirectChildren(key);
}
private void populateAllCache() {
@@ -130,7 +162,13 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T>
implements MetaClientC
T dataRecord = _cacheClient.readData(node, true);
_dataCacheMap.put(node, dataRecord);
}
- queue.addAll(_cacheClient.getChildren(node));
+ if (_cacheChildren) {
+ _childrenCacheTree.processPath(node, true);
+ }
+ List<String> childNodes = _cacheClient.getChildren(node);
+ for (String child : childNodes) {
+ queue.add(node + "/" + child); // Add child nodes to the queue
with their full path.
+ }
}
// Let the other threads know that the cache is populated.
_initializedCache.countDown();
@@ -151,11 +189,11 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T>
implements MetaClientC
// TODO: HANDLE DEDUP EVENT CHANGES
switch (changeType) {
case ENTRY_CREATED:
- // Not implemented yet.
+ _childrenCacheTree.processPath(path, true);
modifyDataInCache(path, false);
break;
case ENTRY_DELETED:
- // Not implemented yet.
+ _childrenCacheTree.processPath(path, false);
modifyDataInCache(path, true);
break;
case ENTRY_DATA_CHANGE:
@@ -190,9 +228,6 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T>
implements MetaClientC
return _dataCacheMap;
}
- public TrieNode getChildrenCacheTree() {
- return _childrenCacheTree;
- }
/**
* Connect to the underlying ZkClient.
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
index 30a0a729d..2950a652e 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
@@ -19,13 +19,13 @@ package org.apache.helix.metaclient.impl.zk;
* under the License.
*/
+import org.apache.helix.metaclient.MetaClientTestUtil;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
public class TestZkMetaClientCache extends ZkMetaClientTestBase {
private static final String DATA_PATH = "/data";
@@ -48,32 +48,55 @@ public class TestZkMetaClientCache extends
ZkMetaClientTestBase {
zkMetaClientCache.connect();
zkMetaClientCache.create(key, "test");
zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
-
// Get data for DATA_PATH and cache it
String data = zkMetaClientCache.get(key + DATA_PATH);
Assert.assertEquals(data,
zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH),
data)), MetaClientTestUtil.WAIT_DURATION));
// Update data for DATA_PATH
String newData = zkMetaClientCache.update(key + DATA_PATH,
currentData -> currentData + "1");
- // Verify that cached data is updated. Might take some time
- for (int i = 0; i < 10; i++) {
- if (zkMetaClientCache.getDataCacheMap().get(key +
DATA_PATH).equals(newData)) {
- break;
- }
- Thread.sleep(1000);
- }
- Assert.assertEquals(newData,
zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH),
newData)), MetaClientTestUtil.WAIT_DURATION));
zkMetaClientCache.delete(key + DATA_PATH);
- // Verify that cached data is updated. Might take some time
- for (int i = 0; i < 10; i++) {
- if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)
== null) {
- break;
- }
- Thread.sleep(1000);
- }
- } catch (InterruptedException e) {
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH),
null)), MetaClientTestUtil.WAIT_DURATION));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testGetDirectChildrenKeys() {
+ final String key = "/testGetDirectChildrenKeys";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+ (zkMetaClientCache.getDirectChildrenKeys(key).size() ==
2), MetaClientTestUtil.WAIT_DURATION));
+
+
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child1"));
+
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child2"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testCountDirectChildren() {
+ final String key = "/testCountDirectChildren";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.countDirectChildren(key) == 2),
MetaClientTestUtil.WAIT_DURATION));
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -94,25 +117,68 @@ public class TestZkMetaClientCache extends
ZkMetaClientTestBase {
values.add("test");
values.add(DATA_VALUE);
- for (int i = 0; i < 10; i++) {
- // Get data for DATA_PATH and cache it
- List<String> data = zkMetaClientCache.get(keys);
- if (data.equals(values)) {
- break;
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.get(keys).equals(values)), MetaClientTestUtil.WAIT_DURATION));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testLargeClusterLoading() {
+ final String key = "/testLargerNodes";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+
+ int numLayers = 4;
+ int numNodesPerLayer = 20;
+
+ // Create the root node
+ zkMetaClient.create(key, "test");
+
+ Queue<String> queue = new LinkedList<>();
+ queue.offer(key);
+
+ for (int layer = 1; layer <= numLayers; layer++) {
+ int nodesAtThisLayer = Math.min(numNodesPerLayer, queue.size()
* numNodesPerLayer);
+
+ for (int i = 0; i < nodesAtThisLayer; i++) {
+ String parentKey = queue.poll();
+ for (int j = 0; j < numNodesPerLayer; j++) {
+ String newNodeKey = parentKey + "/node" + j;
+ zkMetaClient.create(newNodeKey, "test");
+ queue.offer(newNodeKey);
+ }
}
- Thread.sleep(1000);
}
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+
+ // Assert Checks on a Random Path
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.get(key + "/node4/node1").equals("test")),
MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.countDirectChildren(key) == numNodesPerLayer),
MetaClientTestUtil.WAIT_DURATION));
+ String newData = zkMetaClientCache.update(key +
"/node4/node1", currentData -> currentData + "1");
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.get(key + "/node4/node1").equals(newData)),
MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+ (zkMetaClientCache.getDirectChildrenKeys(key +
"/node4/node1")
+ .equals(zkMetaClient.getDirectChildrenKeys(key
+ "/node4/node1"))), MetaClientTestUtil.WAIT_DURATION));
+
+ zkMetaClientCache.delete(key + "/node4/node1");
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
(Objects.equals(zkMetaClientCache.get(key + "/node4/node1"), null)),
MetaClientTestUtil.WAIT_DURATION));
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
- protected static ZkMetaClientCache<String>
createZkMetaClientCacheLazyCaching(String rootPath) {
+
+ public ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String
rootPath) {
ZkMetaClientConfig config =
new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
//.setZkSerializer(new TestStringSerializer())
.build();
- MetaClientCacheConfig cacheConfig = new
MetaClientCacheConfig(rootPath, true, true, true);
+ MetaClientCacheConfig cacheConfig = new
MetaClientCacheConfig(rootPath, true, true);
return new ZkMetaClientCache<>(config, cacheConfig);
}
}