This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f1a3f716 Merge feature branch (#2647)
5f1a3f716 is described below

commit 5f1a3f716ec71178f2f1f98c2ef48c62d6285df5
Author: xyuanlu <[email protected]>
AuthorDate: Mon Oct 16 15:24:52 2023 -0700

    Merge feature branch (#2647)
    
    * MetaClientCache Part 1 - API's, configs, and builders (#2612)
    
    MetaClientCache Part 1 - API's, configs, and builders
    
    ---------
    
    Co-authored-by: mapeng <[email protected]>
    
    * Skip one time listener re-register for exists for ZkClient - MetaClient 
usage. (#2637)
    
    * Lattice cache - caching just data implementation (#2619)
    
    Lattice cache - caching just data implementation
    ---------
    
    Co-authored-by: mapeng <[email protected]>
    
    * Add recursiveCreate functionality to metaclient (#2607)
    
    
    
    Co-authored-by: Grant Palau Spencer <[email protected]>
    
    * Lattice Children Cache Implementation(#2623)
    
    Co-authored-by: mapeng <[email protected]>
    
    ---------
    
    Co-authored-by: Marcos Rico Peng 
<[email protected]>
    Co-authored-by: mapeng <[email protected]>
    Co-authored-by: Grant Paláu Spencer 
<[email protected]>
    Co-authored-by: Grant Palau Spencer <[email protected]>
---
 .../metaclient/api/MetaClientCacheInterface.java   |  90 ++++++++
 .../helix/metaclient/api/MetaClientInterface.java  |  15 ++
 .../MetaClientCacheConfig.java}                    |  37 ++--
 .../metaclient/factories/MetaClientFactory.java    |  28 ++-
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  65 +++++-
 .../metaclient/impl/zk/ZkMetaClientCache.java      | 246 +++++++++++++++++++++
 .../impl/zk/factory/ZkMetaClientFactory.java       |  15 ++
 .../metaclient/impl/zk/util/ZkMetaClientUtil.java  |  25 +++
 .../metaclient/impl/zk/TestStressZkClient.java     |  30 +++
 .../helix/metaclient/impl/zk/TestZkMetaClient.java |  56 ++++-
 .../metaclient/impl/zk/TestZkMetaClientCache.java  | 184 +++++++++++++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |   2 +-
 12 files changed, 767 insertions(+), 26 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
new file mode 100644
index 000000000..3630e1c39
--- /dev/null
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
@@ -0,0 +1,90 @@
+package org.apache.helix.metaclient.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {
+
+    /**
+     * TrieNode class to store the children of the entries to be cached.
+     */
+    class TrieNode {
+        // A mapping between trie key and children nodes.
+        private Map<String, TrieNode> _children;
+        // the complete path/prefix leading to the current node.
+        private final String _path;
+        private final String _nodeKey;
+
+        public TrieNode(String path, String nodeKey) {
+            _path = path;
+            _nodeKey = nodeKey;
+            _children = new HashMap<>();
+        }
+
+        public Map<String, TrieNode> getChildren() {
+            return _children;
+        }
+
+        public String getPath() {
+            return _path;
+        }
+
+        public String getNodeKey() {
+            return _nodeKey;
+        }
+
+        public void addChild(String key, TrieNode node) {
+            _children.put(key, node);
+        }
+
+        public TrieNode processPath(String path, boolean isCreate) {
+            String[] pathComponents = path.split("/");
+            TrieNode currentNode = this;
+            TrieNode previousNode = null;
+
+            for (int i = 1; i < pathComponents.length; i++) {
+                String component = pathComponents[i];
+                if (component.equals(_nodeKey)) {
+                    // Skip the root node
+                } else if (!currentNode.getChildren().containsKey(component)) {
+                    if (isCreate) {
+                        TrieNode newNode = new TrieNode(currentNode.getPath() 
+ "/" + component, component);
+                        currentNode.addChild(component, newNode);
+                        previousNode = currentNode;
+                        currentNode = newNode;
+                    } else {
+                        return currentNode;
+                    }
+                } else {
+                    previousNode = currentNode;
+                    currentNode = currentNode.getChildren().get(component);
+                }
+            }
+
+            if (!isCreate && previousNode != null) {
+                previousNode.getChildren().remove(currentNode.getNodeKey());
+            }
+
+            return currentNode;
+        }
+    }
+}
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 5b26896a9..5403e7ca4 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -152,6 +152,21 @@ public interface MetaClientInterface<T> {
    */
   void create(final String key, final T data, final EntryMode mode);
 
+  /**
+   * Create an entry of given EntryMode with given key and data. If any parent 
node in the node
+   * hierarchy does not exist, then the parent node will attempt to be 
created. The entry will not
+   * be created if there is an existing entry with the same full key. 
Ephemeral nodes cannot have
+   * children, so only the final child in the created path will be ephemeral.
+   */
+  void recursiveCreate(final String key, final T Data, final EntryMode mode);
+
+  /**
+   * Create a TTL entry with given key, data, and expiry time (ttl). If any 
parent node in the node
+   * hierarchy does not exist, then the parent node will attempt to be 
created. The entry will not be created if
+   * there is an existing entry with the same full key.
+   */
+  void recursiveCreateWithTTL(String key, T data, long ttl);
+
   /**
    * Create an entry of given EntryMode with given key, data, and expiry time 
(ttl).
    * The entry will automatically purge when reached expiry time and has no 
children.
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
similarity index 50%
copy from 
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
copy to 
meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
index 9eba28b91..2c23a0a68 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
@@ -1,4 +1,4 @@
-package org.apache.helix.metaclient.impl.zk.factory;
+package org.apache.helix.metaclient.factories;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,21 +19,28 @@ package org.apache.helix.metaclient.impl.zk.factory;
  * under the License.
  */
 
-import org.apache.helix.metaclient.api.MetaClientInterface;
-import org.apache.helix.metaclient.factories.MetaClientConfig;
-import org.apache.helix.metaclient.factories.MetaClientFactory;
-import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 
-public class ZkMetaClientFactory extends MetaClientFactory {
-  @Override
-  public MetaClientInterface getMetaClient(MetaClientConfig config) {
-    if (config == null) {
-      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+
+public class MetaClientCacheConfig {
+    private final String _rootEntry;
+    private final boolean _cacheData;
+    private final boolean _cacheChildren;
+
+    public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean 
cacheChildren) {
+        _rootEntry = rootEntry;
+        _cacheData = cacheData;
+        _cacheChildren = cacheChildren;
+    }
+
+    public String getRootEntry() {
+        return _rootEntry;
     }
-    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())
-        && config instanceof ZkMetaClientConfig) {
-      return new ZkMetaClient((ZkMetaClientConfig) config);
+
+    public boolean getCacheData() {
+        return _cacheData;
+    }
+
+    public boolean getCacheChildren() {
+        return _cacheChildren;
     }
-    throw new IllegalArgumentException("Invalid MetaClientConfig type.");
-  }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
index 045374332..7cc86a8a9 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
@@ -20,6 +20,7 @@ package org.apache.helix.metaclient.factories;
  */
 
 
+import org.apache.helix.metaclient.api.MetaClientCacheInterface;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
@@ -39,14 +40,27 @@ public class MetaClientFactory {
       throw new IllegalArgumentException("MetaClientConfig cannot be null.");
     }
     if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
-      ZkMetaClientConfig zkMetaClientConfig = new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().
-          setConnectionAddress(config.getConnectionAddress())
-          .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
-          
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
-          .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
-          .build();
-      return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      return new 
ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
     }
     return null;
   }
+
+  public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, 
MetaClientCacheConfig cacheConfig) {
+    if (config == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      return new 
ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), 
cacheConfig);
+    }
+      return null;
+  }
+
+  private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) 
{
+      return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
+        setConnectionAddress(config.getConnectionAddress())
+        .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
+        
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
+        .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
+        .build();
+  }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 8753747f3..16d28c6d7 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -41,6 +41,7 @@ import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
 import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
@@ -63,6 +64,7 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.separateIntoUniqueNodePaths;
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
 
@@ -104,7 +106,7 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
   }
 
   @Override
-  public void create(String key, Object data, MetaClientInterface.EntryMode 
mode) {
+  public void create(String key, Object data, EntryMode mode) {
 
     try {
       _zkClient.create(key, data, 
ZkMetaClientUtil.convertMetaClientMode(mode));
@@ -115,6 +117,67 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
     }
   }
 
+  @Override
+  public void recursiveCreate(String key, T data, EntryMode mode) {
+    // Function named recursiveCreate to match naming scheme, but actual work 
is iterative
+    iterativeCreate(key, data, mode, -1);
+  }
+
+  @Override
+  public void recursiveCreateWithTTL(String key, T data, long ttl) {
+    iterativeCreate(key, data, EntryMode.TTL, ttl);
+  }
+
+  private void iterativeCreate(String key, T data, EntryMode mode, long ttl) {
+    List<String> nodePaths = separateIntoUniqueNodePaths(key);
+    int i = 0;
+    // Ephemeral nodes cant have children, so change mode when creating parents
+    EntryMode parentMode = (EntryMode.EPHEMERAL.equals(mode) ?
+        EntryMode.PERSISTENT : mode);
+
+    // Iterate over paths, starting with full key then attempting each 
successive parent
+    // Try /a/b/c, if parent /a/b, does not exist, then try to create parent, 
etc..
+    while (i < nodePaths.size()) {
+      // If parent exists or there is no parent node, then try to create the 
node
+      // and break out of loop on successful create
+      if (i == nodePaths.size() - 1 || _zkClient.exists(nodePaths.get(i+1))) {
+        try {
+          if (EntryMode.TTL.equals(mode)) {
+            createWithTTL(nodePaths.get(i), data, ttl);
+          } else {
+            create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
+          }
+        // Race condition may occur where a  node is created by another thread 
in between loops.
+        // We should not throw error if this occurs for parent nodes, only for 
the full node path.
+        } catch (MetaClientNodeExistsException e) {
+          if (i == 0) {
+            throw e;
+          }
+        }
+        break;
+      // Else try to create parent in next loop iteration
+      } else {
+       i++;
+      }
+    }
+
+    // Reattempt creation of children that failed due to parent not existing
+    while (--i >= 0) {
+      try {
+        if (EntryMode.TTL.equals(mode)) {
+          createWithTTL(nodePaths.get(i), data, ttl);
+        } else {
+          create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
+        }
+      // Catch same race condition as above
+      } catch (MetaClientNodeExistsException e) {
+        if (i == 0) {
+          throw e;
+        }
+      }
+    }
+  }
+
   @Override
   public void createWithTTL(String key, T data, long ttl) {
     try {
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
new file mode 100644
index 000000000..45701063c
--- /dev/null
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
@@ -0,0 +1,246 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.ChildChangeListener;
+import org.apache.helix.metaclient.api.MetaClientCacheInterface;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
+import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements 
MetaClientCacheInterface<T> {
+
+    private ConcurrentHashMap<String, T> _dataCacheMap;
+    private final String _rootEntry;
+    private TrieNode _childrenCacheTree;
+    private ChildChangeListener _eventListener;
+    private boolean _cacheData;
+    private boolean _cacheChildren;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZkMetaClientCache.class);
+    private  ZkClient _cacheClient;
+    private ExecutorService executor;
+
+    // TODO: Look into using conditional variable instead of latch.
+    private final CountDownLatch _initializedCache = new CountDownLatch(1);
+
+    /**
+     * Constructor for ZkMetaClientCache.
+     * @param config ZkMetaClientConfig
+     * @param cacheConfig MetaClientCacheConfig
+     */
+    public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig 
cacheConfig) {
+        super(config);
+        _cacheClient = getZkClient();
+        _rootEntry = cacheConfig.getRootEntry();
+        _cacheData = cacheConfig.getCacheData();
+        _cacheChildren = cacheConfig.getCacheChildren();
+
+        if (_cacheData) {
+            _dataCacheMap = new ConcurrentHashMap<>();
+        }
+        if (_cacheChildren) {
+            _childrenCacheTree = new TrieNode(_rootEntry, 
_rootEntry.substring(1));
+        }
+    }
+
+    /**
+     * Get data for a given key.
+     * If datacache is enabled, will fetch for cache. If it doesn't exist
+     * returns null (for when initial populating cache is in progress).
+     * @param key key to identify the entry
+     * @return data for the key
+     */
+    @Override
+    public T get(final String key) {
+        if (_cacheData) {
+            T data = getDataCacheMap().get(key);
+            if (data == null) {
+                LOG.debug("Data not found in cache for key: {}. This could be 
because the cache is still being populated.", key);
+            }
+            return data;
+        }
+        return super.get(key);
+    }
+
+    @Override
+    public List<T> get(List<String> keys) {
+        List<T> dataList = new ArrayList<>();
+        for (String key : keys) {
+            dataList.add(get(key));
+        }
+        return dataList;
+    }
+
+    /**
+     * Get the direct children for a given key.
+     * @param key For metadata storage that has hierarchical key space (e.g. 
ZK), the key would be
+     *            a parent key,
+     *            For metadata storage that has non-hierarchical key space 
(e.g. etcd), the key would
+     *            be a prefix key.
+     * @return list of direct children or null if key doesn't exist / cache is 
not populated yet.
+     */
+    @Override
+    public List<String> getDirectChildrenKeys(final String key) {
+        if (_cacheChildren) {
+            TrieNode node = _childrenCacheTree.processPath(key, true);
+            if (node == null) {
+                LOG.debug("Children not found in cache for key: {}. This could 
be because the cache is still being populated.", key);
+                return null;
+            }
+            return List.copyOf(node.getChildren().keySet());
+        }
+        return super.getDirectChildrenKeys(key);
+    }
+
+    /**
+     * Get the number of direct children for a given key.
+     * @param key For metadata storage that has hierarchical key space (e.g. 
ZK), the key would be
+     *            a parent key,
+     *            For metadata storage that has non-hierarchical key space 
(e.g. etcd), the key would
+     *            be a prefix key.
+     * @return number of direct children or 0 if key doesn't exist / has no 
children / cache is not populated yet.
+     */
+    @Override
+    public int countDirectChildren(final String key) {
+        if (_cacheChildren) {
+            TrieNode node = _childrenCacheTree.processPath(key, true);
+            if (node == null) {
+                LOG.debug("Children not found in cache for key: {}. This could 
be because the cache is still being populated.", key);
+                return 0;
+            }
+            return node.getChildren().size();
+        }
+        return super.countDirectChildren(key);
+    }
+
+    private void populateAllCache() {
+        // TODO: Concurrently populate children and data cache.
+        if (!_cacheClient.exists(_rootEntry)) {
+            LOG.warn("Root entry: {} does not exist.", _rootEntry);
+            // Let the other threads know that the cache is populated.
+            _initializedCache.countDown();
+            return;
+        }
+
+        Queue<String> queue = new ArrayDeque<>();
+        queue.add(_rootEntry);
+
+        while (!queue.isEmpty()) {
+            String node = queue.poll();
+            if (_cacheData) {
+                T dataRecord = _cacheClient.readData(node, true);
+                _dataCacheMap.put(node, dataRecord);
+            }
+            if (_cacheChildren) {
+                _childrenCacheTree.processPath(node, true);
+            }
+            List<String> childNodes = _cacheClient.getChildren(node);
+            for (String child : childNodes) {
+                queue.add(node + "/" + child); // Add child nodes to the queue 
with their full path.
+            }
+        }
+        // Let the other threads know that the cache is populated.
+        _initializedCache.countDown();
+    }
+
+    private class CacheUpdateRunnable implements Runnable {
+        private final String path;
+        private final ChildChangeListener.ChangeType changeType;
+
+        public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType 
changeType) {
+            this.path = path;
+            this.changeType = changeType;
+        }
+
+        @Override
+        public void run() {
+            waitForPopulateAllCache();
+            //  TODO: HANDLE DEDUP EVENT CHANGES
+            switch (changeType) {
+                case ENTRY_CREATED:
+                    _childrenCacheTree.processPath(path, true);
+                    modifyDataInCache(path, false);
+                    break;
+                case ENTRY_DELETED:
+                    _childrenCacheTree.processPath(path, false);
+                    modifyDataInCache(path, true);
+                    break;
+                case ENTRY_DATA_CHANGE:
+                    modifyDataInCache(path, false);
+                    break;
+                default:
+                    LOG.error("Unknown change type: " + changeType);
+            }
+        }
+    }
+
+    private void waitForPopulateAllCache() {
+        try {
+            _initializedCache.await();
+        } catch (InterruptedException e) {
+            throw new MetaClientException("Interrupted while waiting for cache 
to populate.", e);
+        }
+    }
+
+    private void modifyDataInCache(String path, Boolean isDelete) {
+        if (_cacheData) {
+            if (isDelete) {
+                getDataCacheMap().remove(path);
+            } else {
+                T dataRecord = _cacheClient.readData(path, true);
+                getDataCacheMap().put(path, dataRecord);
+            }
+        }
+    }
+
+    public ConcurrentHashMap<String, T> getDataCacheMap() {
+        return _dataCacheMap;
+    }
+
+
+    /**
+     * Connect to the underlying ZkClient.
+     */
+    @Override
+    public void connect() {
+        super.connect();
+        _eventListener = (path, changeType) -> {
+            Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, 
changeType);
+            executor.execute(cacheUpdateRunnable);
+        };
+        executor = Executors.newSingleThreadExecutor();
+        _cacheClient.subscribePersistRecursiveListener(_rootEntry, new 
ChildListenerAdapter(_eventListener));
+        populateAllCache();
+    }
+}
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
index 9eba28b91..c4018eb2f 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
@@ -19,10 +19,13 @@ package org.apache.helix.metaclient.impl.zk.factory;
  * under the License.
  */
 
+import org.apache.helix.metaclient.api.MetaClientCacheInterface;
 import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
 import org.apache.helix.metaclient.factories.MetaClientFactory;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientCache;
 
 public class ZkMetaClientFactory extends MetaClientFactory {
   @Override
@@ -36,4 +39,16 @@ public class ZkMetaClientFactory extends MetaClientFactory {
     }
     throw new IllegalArgumentException("Invalid MetaClientConfig type.");
   }
+
+  @Override
+  public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, 
MetaClientCacheConfig cacheConfig) {
+    if (config == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())
+            && config instanceof ZkMetaClientConfig) {
+      return new ZkMetaClientCache((ZkMetaClientConfig) config, cacheConfig);
+    }
+    throw new IllegalArgumentException("Invalid MetaClientConfig type.");
+  }
 }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
index f93e98919..aee0c698a 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.metaclient.impl.zk.util;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -358,4 +359,28 @@ public class ZkMetaClientUtil {
         return MetaClientException.ReturnCode.DB_USER_ERROR;
     }
   }
+
+  // Returns null if no parent path
+  public static String getZkParentPath(String path) {
+    int idx = path.lastIndexOf('/');
+    return idx == 0 ? null : path.substring(0, idx);
+  }
+
+  // Splits a path into the paths for each node along the way.
+  // /a/b/c --> /a/b/c, /a/b, /a
+  public static List<String> separateIntoUniqueNodePaths(String path) {
+    if (path == null || "/".equals(path)) {
+      return null;
+    }
+
+    String[] subPath = path.split("/");
+    String[] nodePaths = new String[subPath.length-1];
+    StringBuilder tempPath = new StringBuilder();
+    for (int i = 1; i < subPath.length; i++) {
+      tempPath.append( "/");
+      tempPath.append(subPath[i]);
+      nodePaths[subPath.length - 1 - i] = tempPath.toString();
+    }
+    return Arrays.asList(nodePaths);
+  }
 }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
index 6f358f0e8..284ce3645 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.metaclient.api.*;
 import org.apache.helix.metaclient.datamodel.DataRecord;
 import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.recipes.lock.DataRecordSerializer;
 import org.testng.Assert;
@@ -129,6 +130,35 @@ public class TestStressZkClient extends 
ZkMetaClientTestBase {
     Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
   }
 
+  @Test
+  public void testRecursiveCreate() {
+    final String zkParentKey = "/stressZk_testRecursiveCreate";
+    _zkMetaClient.create(zkParentKey, ENTRY_STRING_VALUE);
+
+    int count = (int) Math.pow(TEST_ITERATION_COUNT, 1/3d);
+    for (int i = 0; i < count; i++) {
+
+      for (int j = 0; j < count; j++) {
+
+        for (int k = 0; k < count; k++) {
+          String key = zkParentKey + "/" + i + "/" + j + "/" + k;
+          _zkMetaClient.recursiveCreate(key, String.valueOf(k), PERSISTENT);
+          Assert.assertEquals(String.valueOf(k), _zkMetaClient.get(key));
+        }
+      }
+      try {
+        _zkMetaClient.recursiveCreate(zkParentKey + "/" + i, "should_fail", 
PERSISTENT);
+        Assert.fail("Should have failed due to node existing");
+      } catch (MetaClientNodeExistsException ignoredException) {
+      }
+    }
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+
+  }
+
   @Test
   public void testGet() {
     final String zkParentKey = "/stressZk_testGet";
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 19f21977b..a5da69f2f 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -26,6 +26,8 @@ import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.api.DirectChildChangeListener;
 
+import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -38,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.NotImplementedException;
-import org.apache.helix.metaclient.api.ConnectStateChangeListener;
 import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
@@ -50,6 +51,7 @@ import org.testng.annotations.Test;
 
 import static 
org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE;
 import static 
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
+import static 
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.EPHEMERAL;
 import static 
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
 
 
@@ -97,6 +99,48 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
     }
   }
 
+  @Test
+  public void testRecursiveCreate() {
+    final String path = "/Test/ZkMetaClient/_fullPath";
+
+
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      MetaClientInterface.EntryMode mode = EPHEMERAL;
+
+      // Should succeed even if one of the parent nodes exists
+      String extendedPath = "/A" + path;
+      zkMetaClient.create("/A", ENTRY_STRING_VALUE, PERSISTENT);
+      zkMetaClient.recursiveCreate(extendedPath, ENTRY_STRING_VALUE, mode);
+      Assert.assertNotNull(zkMetaClient.exists(extendedPath));
+
+      // Should succeed if no parent nodes exist
+      zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode);
+      Assert.assertNotNull(zkMetaClient.exists(path));
+      
Assert.assertEquals(zkMetaClient.getDataAndStat("/Test").getRight().getEntryType(),
 PERSISTENT);
+      
Assert.assertEquals(zkMetaClient.getDataAndStat(path).getRight().getEntryType(),
 mode);
+
+      // Should throw NodeExistsException if child node exists
+      zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode);
+      Assert.fail("Should have failed due to node already created");
+    } catch (MetaClientException e) {
+      Assert.assertEquals(e.getMessage(), 
"org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException: 
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists for /Test/ZkMetaClient/_fullPath");
+      System.out.println(e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void testRecursiveCreateWithTTL() {
+    final String path = "/Test/ZkMetaClient/_fullPath/withTTL";
+
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      zkMetaClient.recursiveCreateWithTTL(path, ENTRY_STRING_VALUE, 1000);
+      Assert.assertNotNull(zkMetaClient.exists(path));
+    }
+  }
+
   @Test
   public void testRenewTTL() {
     final String key = "/TestZkMetaClient_testRenewTTL_1";
@@ -517,12 +561,20 @@ public class TestZkMetaClient extends 
ZkMetaClientTestBase{
             throws Exception {
         }
       };
-      zkMetaClient.subscribeDataChange(basePath, listener, false);
+      DirectChildChangeListener cldListener = new DirectChildChangeListener() {
+
+        @Override
+        public void handleDirectChildChange(String key) throws Exception {
+
+        }
+      };
+      zkMetaClient.subscribeDataChange(basePath, listener, true);
       zkMetaClient.create(basePath, "");
       zkMetaClient.get(basePath);
       zkMetaClient.exists(basePath);
       zkMetaClient.getDataAndStat(basePath);
       zkMetaClient.getDirectChildrenKeys(basePath);
+      zkMetaClient.subscribeDirectChildChange(basePath, cldListener, true);
     }
   }
 
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
new file mode 100644
index 000000000..2950a652e
--- /dev/null
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
@@ -0,0 +1,184 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.MetaClientTestUtil;
+import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.*;
+
+public class TestZkMetaClientCache extends ZkMetaClientTestBase {
+    private static final String DATA_PATH = "/data";
+    private static final String DATA_VALUE = "testData";
+
+    @Test
+    public void testCreateClient() {
+        final String key = "/testCreate";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            // Perform some random non-read operation
+            zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+        }
+    }
+
+    @Test
+    public void testCacheDataUpdates() {
+        final String key = "/testCacheDataUpdates";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, "test");
+            zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+            // Get data for DATA_PATH and cache it
+            String data = zkMetaClientCache.get(key + DATA_PATH);
+            Assert.assertEquals(data, 
zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), 
data)), MetaClientTestUtil.WAIT_DURATION));
+
+            // Update data for DATA_PATH
+            String newData = zkMetaClientCache.update(key + DATA_PATH, 
currentData -> currentData + "1");
+
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), 
newData)), MetaClientTestUtil.WAIT_DURATION));
+
+            zkMetaClientCache.delete(key + DATA_PATH);
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH), 
null)), MetaClientTestUtil.WAIT_DURATION));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testGetDirectChildrenKeys() {
+        final String key = "/testGetDirectChildrenKeys";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+
+            Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                    (zkMetaClientCache.getDirectChildrenKeys(key).size() == 
2), MetaClientTestUtil.WAIT_DURATION));
+
+            
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child1"));
+            
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child2"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testCountDirectChildren() {
+        final String key = "/testCountDirectChildren";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+            zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+            Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.countDirectChildren(key) == 2), 
MetaClientTestUtil.WAIT_DURATION));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testBatchGet() {
+        final String key = "/testBatchGet";
+        try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, "test");
+            zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+
+            ArrayList<String> keys = new ArrayList<>();
+            keys.add(key);
+            keys.add(key + DATA_PATH);
+
+            ArrayList<String> values = new ArrayList<>();
+            values.add("test");
+            values.add(DATA_VALUE);
+
+            Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.get(keys).equals(values)), MetaClientTestUtil.WAIT_DURATION));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testLargeClusterLoading() {
+        final String key = "/testLargerNodes";
+        try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+            zkMetaClient.connect();
+
+            int numLayers = 4;
+            int numNodesPerLayer = 20;
+
+            // Create the root node
+            zkMetaClient.create(key, "test");
+
+            Queue<String> queue = new LinkedList<>();
+            queue.offer(key);
+
+            for (int layer = 1; layer <= numLayers; layer++) {
+                int nodesAtThisLayer = Math.min(numNodesPerLayer, queue.size() 
* numNodesPerLayer);
+
+                for (int i = 0; i < nodesAtThisLayer; i++) {
+                    String parentKey = queue.poll();
+                    for (int j = 0; j < numNodesPerLayer; j++) {
+                        String newNodeKey = parentKey + "/node" + j;
+                        zkMetaClient.create(newNodeKey, "test");
+                        queue.offer(newNodeKey);
+                    }
+                }
+            }
+
+            try (ZkMetaClientCache<String> zkMetaClientCache = 
createZkMetaClientCacheLazyCaching(key)) {
+                zkMetaClientCache.connect();
+
+                // Assert Checks on a Random Path
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.get(key + "/node4/node1").equals("test")), 
MetaClientTestUtil.WAIT_DURATION));
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.countDirectChildren(key) == numNodesPerLayer), 
MetaClientTestUtil.WAIT_DURATION));
+                String newData = zkMetaClientCache.update(key + 
"/node4/node1", currentData -> currentData + "1");
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> ( 
zkMetaClientCache.get(key + "/node4/node1").equals(newData)), 
MetaClientTestUtil.WAIT_DURATION));
+                Assert.assertTrue(MetaClientTestUtil.verify(() ->
+                        (zkMetaClientCache.getDirectChildrenKeys(key + 
"/node4/node1")
+                                .equals(zkMetaClient.getDirectChildrenKeys(key 
+ "/node4/node1"))), MetaClientTestUtil.WAIT_DURATION));
+
+                zkMetaClientCache.delete(key + "/node4/node1");
+                Assert.assertTrue(MetaClientTestUtil.verify(() -> 
(Objects.equals(zkMetaClientCache.get(key + "/node4/node1"), null)), 
MetaClientTestUtil.WAIT_DURATION));
+
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+
+    public ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String 
rootPath) {
+        ZkMetaClientConfig config =
+                new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+                        //.setZkSerializer(new TestStringSerializer())
+                        .build();
+        MetaClientCacheConfig cacheConfig = new 
MetaClientCacheConfig(rootPath, true, true);
+        return new ZkMetaClientCache<>(config, cacheConfig);
+    }
+}
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 2a06158d0..110587e09 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1474,7 +1474,7 @@ public class ZkClient implements Watcher {
   }
 
   public boolean exists(final String path) {
-    return exists(path, hasChildOrDataListeners(path));
+    return exists(path, (!_usePersistWatcher) && 
hasChildOrDataListeners(path));
   }
 
   protected boolean exists(final String path, final boolean watch) {


Reply via email to