This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new a3a3e2b6f Lattice Children Cache Implementation(#2623)
a3a3e2b6f is described below

commit a3a3e2b6ff0192d990436836436b8cb3b272813d
Author: Marcos Rico Peng <[email protected]>
AuthorDate: Thu Oct 5 17:23:49 2023 -0700

    Lattice Children Cache Implementation(#2623)
    
    Co-authored-by: mapeng <[email protected]>
---
 .../metaclient/api/MetaClientCacheInterface.java   |  35 +++++-
 .../factories/MetaClientCacheConfig.java           |   7 +-
 .../metaclient/impl/zk/ZkMetaClientCache.java      |  53 +++++++--
 .../metaclient/impl/zk/TestZkMetaClientCache.java  | 124 ++++++++++++++++-----
 4 files changed, 174 insertions(+), 45 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
index 6449970bb..3630e1c39 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
@@ -30,10 +30,8 @@ public interface MetaClientCacheInterface<T> extends 
MetaClientInterface<T> {
     class TrieNode {
         // A mapping between trie key and children nodes.
         private Map<String, TrieNode> _children;
-
         // the complete path/prefix leading to the current node.
         private final String _path;
-
         private final String _nodeKey;
 
         public TrieNode(String path, String nodeKey) {
@@ -54,8 +52,39 @@ public interface MetaClientCacheInterface<T> extends 
MetaClientInterface<T> {
             return _nodeKey;
         }
 
-        public void addChild(String key,  TrieNode node) {
+        public void addChild(String key, TrieNode node) {
             _children.put(key, node);
         }
+
+        public TrieNode processPath(String path, boolean isCreate) {
+            String[] pathComponents = path.split("/");
+            TrieNode currentNode = this;
+            TrieNode previousNode = null;
+
+            for (int i = 1; i < pathComponents.length; i++) {
+                String component = pathComponents[i];
+                if (component.equals(_nodeKey)) {
+                    // Skip the root node
+                } else if (!currentNode.getChildren().containsKey(component)) {
+                    if (isCreate) {
+                        TrieNode newNode = new TrieNode(currentNode.getPath() 
+ "/" + component, component);
+                        currentNode.addChild(component, newNode);
+                        previousNode = currentNode;
+                        currentNode = newNode;
+                    } else {
+                        return currentNode;
+                    }
+                } else {
+                    previousNode = currentNode;
+                    currentNode = currentNode.getChildren().get(component);
+                }
+            }
+
+            if (!isCreate && previousNode != null) {
+                previousNode.getChildren().remove(currentNode.getNodeKey());
+            }
+
+            return currentNode;
+        }
     }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
index 07972945a..2c23a0a68 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
@@ -23,10 +23,10 @@ package org.apache.helix.metaclient.factories;
 
 public class MetaClientCacheConfig {
     private final String _rootEntry;
-    private boolean _cacheData = false;
-    private boolean _cacheChildren = false;
+    private final boolean _cacheData;
+    private final boolean _cacheChildren;
 
-    public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean 
cacheChildren, boolean lazyCaching) {
+    public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean 
cacheChildren) {
         _rootEntry = rootEntry;
         _cacheData = cacheData;
         _cacheChildren = cacheChildren;
@@ -43,5 +43,4 @@ public class MetaClientCacheConfig {
     public boolean getCacheChildren() {
         return _cacheChildren;
     }
-
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
index 66eb6f91e..45701063c 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
@@ -70,7 +70,7 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> 
implements MetaClientC
             _dataCacheMap = new ConcurrentHashMap<>();
         }
         if (_cacheChildren) {
-            _childrenCacheTree = new TrieNode(_rootEntry, null);
+            _childrenCacheTree = new TrieNode(_rootEntry, 
_rootEntry.substring(1));
         }
     }
 
@@ -102,14 +102,46 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> 
implements MetaClientC
         return dataList;
     }
 
+    /**
+     * Get the direct children for a given key.
+     * @param key For metadata storage that has hierarchical key space (e.g. 
ZK), the key would be
+     *            a parent key,
+     *            For metadata storage that has non-hierarchical key space 
(e.g. etcd), the key would
+     *            be a prefix key.
+     * @return list of direct children or null if key doesn't exist / cache is 
not populated yet.
+     */
     @Override
     public List<String> getDirectChildrenKeys(final String key) {
-        throw new MetaClientException("Not implemented yet.");
+        if (_cacheChildren) {
+            TrieNode node = _childrenCacheTree.processPath(key, true);
+            if (node == null) {
+                LOG.debug("Children not found in cache for key: {}. This could 
be because the cache is still being populated.", key);
+                return null;
+            }
+            return List.copyOf(node.getChildren().keySet());
+        }
+        return super.getDirectChildrenKeys(key);
     }
 
+    /**
+     * Get the number of direct children for a given key.
+     * @param key For metadata storage that has hierarchical key space (e.g. 
ZK), the key would be
+     *            a parent key,
+     *            For metadata storage that has non-hierarchical key space 
(e.g. etcd), the key would
+     *            be a prefix key.
+     * @return number of direct children or 0 if key doesn't exist / has no 
children / cache is not populated yet.
+     */
     @Override
     public int countDirectChildren(final String key) {
-        throw new MetaClientException("Not implemented yet.");
+        if (_cacheChildren) {
+            TrieNode node = _childrenCacheTree.processPath(key, true);
+            if (node == null) {
+                LOG.debug("Children not found in cache for key: {}. This could 
be because the cache is still being populated.", key);
+                return 0;
+            }
+            return node.getChildren().size();
+        }
+        return super.countDirectChildren(key);
     }
 
     private void populateAllCache() {
@@ -130,7 +162,13 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> 
implements MetaClientC
                 T dataRecord = _cacheClient.readData(node, true);
                 _dataCacheMap.put(node, dataRecord);
             }
-            queue.addAll(_cacheClient.getChildren(node));
+            if (_cacheChildren) {
+                _childrenCacheTree.processPath(node, true);
+            }
+            List<String> childNodes = _cacheClient.getChildren(node);
+            for (String child : childNodes) {
+                queue.add(node + "/" + child); // Add child nodes to the queue 
with their full path.
+            }
         }
         // Let the other threads know that the cache is populated.
         _initializedCache.countDown();
@@ -151,11 +189,11 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> 
implements MetaClientC
             //  TODO: HANDLE DEDUP EVENT CHANGES
             switch (changeType) {
                 case ENTRY_CREATED:
-                    // Not implemented yet.
+                    _childrenCacheTree.processPath(path, true);
                     modifyDataInCache(path, false);
                     break;
                 case ENTRY_DELETED:
-                    // Not implemented yet.
+                    _childrenCacheTree.processPath(path, false);
                     modifyDataInCache(path, true);
                     break;
                 case ENTRY_DATA_CHANGE:
@@ -190,9 +228,6 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> 
implements MetaClientC
         return _dataCacheMap;
     }
 
-    public TrieNode getChildrenCacheTree() {
-        return _childrenCacheTree;
-    }
 
     /**
      * Connect to the underlying ZkClient.
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
index 30a0a729d..2950a652e 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
@@ -19,13 +19,13 @@ package org.apache.helix.metaclient.impl.zk;
  * under the License.
  */
 
+import org.apache.helix.metaclient.MetaClientTestUtil;
 import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 public class TestZkMetaClientCache extends ZkMetaClientTestBase {
     private static final String DATA_PATH = "/data";
@@ -48,32 +48,55 @@ public class TestZkMetaClientCache extends 
ZkMetaClientTestBase {
             zkMetaClientCache.connect();
             zkMetaClientCache.create(key, "test");
             zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
-
             // Get data for DATA_PATH and cache it
             String data = zkMetaClientCache.get(key + DATA_PATH);
             Assert.assertEquals(data, 
zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), 
data)), MetaClientTestUtil.WAIT_DURATION));
 
             // Update data for DATA_PATH
             String newData = zkMetaClientCache.update(key + DATA_PATH, 
currentData -> currentData + "1");
 
-            // Verify that cached data is updated. Might take some time
-            for (int i = 0; i < 10; i++) {
-                if (zkMetaClientCache.getDataCacheMap().get(key + 
DATA_PATH).equals(newData)) {
-                    break;
-                }
-                Thread.sleep(1000);
-            }
-            Assert.assertEquals(newData, 
zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), 
newData)), MetaClientTestUtil.WAIT_DURATION));
 
             zkMetaClientCache.delete(key + DATA_PATH);
-            // Verify that cached data is updated. Might take some time
-            for (int i = 0; i < 10; i++) {
-                if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) 
== null) {
-                    break;
-                }
-                Thread.sleep(1000);
-            }
-        } catch (InterruptedException e) {
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), 
null)), MetaClientTestUtil.WAIT_DURATION));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testGetDirectChildrenKeys() {
+        final String key = "/testGetDirectChildrenKeys";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    (zkMetaClientCache.getDirectChildrenKeys(key).size() == 
2), MetaClientTestUtil.WAIT_DURATION));
+
+            
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child1"));
+            
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child2"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testCountDirectChildren() {
+        final String key = "/testCountDirectChildren";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+            Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.countDirectChildren(key) == 2), 
MetaClientTestUtil.WAIT_DURATION));
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
@@ -94,25 +117,68 @@ public class TestZkMetaClientCache extends 
ZkMetaClientTestBase {
             values.add("test");
             values.add(DATA_VALUE);
 
-            for (int i = 0; i < 10; i++) {
-                // Get data for DATA_PATH and cache it
-                List<String> data = zkMetaClientCache.get(keys);
-                if (data.equals(values)) {
-                    break;
+            Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.get(keys).equals(values)), MetaClientTestUtil.WAIT_DURATION));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testLargeClusterLoading() {
+        final String key = "/testLargerNodes";
+        try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+            zkMetaClient.connect();
+
+            int numLayers = 4;
+            int numNodesPerLayer = 20;
+
+            // Create the root node
+            zkMetaClient.create(key, "test");
+
+            Queue<String> queue = new LinkedList<>();
+            queue.offer(key);
+
+            for (int layer = 1; layer <= numLayers; layer++) {
+                int nodesAtThisLayer = Math.min(numNodesPerLayer, queue.size() 
* numNodesPerLayer);
+
+                for (int i = 0; i < nodesAtThisLayer; i++) {
+                    String parentKey = queue.poll();
+                    for (int j = 0; j < numNodesPerLayer; j++) {
+                        String newNodeKey = parentKey + "/node" + j;
+                        zkMetaClient.create(newNodeKey, "test");
+                        queue.offer(newNodeKey);
+                    }
                 }
-                Thread.sleep(1000);
             }
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+
+            try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+                zkMetaClientCache.connect();
+
+                // Assert Checks on a Random Path
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.get(key + "/node4/node1").equals("test")), 
MetaClientTestUtil.WAIT_DURATION));
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.countDirectChildren(key) == numNodesPerLayer), 
MetaClientTestUtil.WAIT_DURATION));
+                String newData = zkMetaClientCache.update(key + 
"/node4/node1", currentData -> currentData + "1");
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.get(key + "/node4/node1").equals(newData)), 
MetaClientTestUtil.WAIT_DURATION));
+                Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                        (zkMetaClientCache.getDirectChildrenKeys(key + 
"/node4/node1")
+                                .equals(zkMetaClient.getDirectChildrenKeys(key 
+ "/node4/node1"))), MetaClientTestUtil.WAIT_DURATION));
+
+                zkMetaClientCache.delete(key + "/node4/node1");
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> 
(Objects.equals(zkMetaClientCache.get(key + "/node4/node1"), null)), 
MetaClientTestUtil.WAIT_DURATION));
+
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
-    protected static ZkMetaClientCache<String> 
createZkMetaClientCacheLazyCaching(String rootPath) {
+
+    public ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String 
rootPath) {
         ZkMetaClientConfig config =
                 new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
                         //.setZkSerializer(new TestStringSerializer())
                         .build();
-        MetaClientCacheConfig cacheConfig = new 
MetaClientCacheConfig(rootPath, true, true, true);
+        MetaClientCacheConfig cacheConfig = new 
MetaClientCacheConfig(rootPath, true, true);
         return new ZkMetaClientCache<>(config, cacheConfig);
     }
 }

Reply via email to