This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 5f1a3f716 Merge feature branch (#2647)
5f1a3f716 is described below
commit 5f1a3f716ec71178f2f1f98c2ef48c62d6285df5
Author: xyuanlu <[email protected]>
AuthorDate: Mon Oct 16 15:24:52 2023 -0700
Merge feature branch (#2647)
* MetaClientCache Part 1 - API's, configs, and builders (#2612)
MetaClientCache Part 1 - API's, configs, and builders
---------
Co-authored-by: mapeng <[email protected]>
* Skip one time listener re-register for exists for ZkClient - MetaClient
usage. (#2637)
* Lattice cache - caching just data implementation (#2619)
Lattice cache - caching just data implementation
---------
Co-authored-by: mapeng <[email protected]>
* Add recursiveCreate functionality to metaclient (#2607)
Co-authored-by: Grant Palau Spencer <[email protected]>
* Lattice Children Cache Implementation(#2623)
Co-authored-by: mapeng <[email protected]>
---------
Co-authored-by: Marcos Rico Peng
<[email protected]>
Co-authored-by: mapeng <[email protected]>
Co-authored-by: Grant Paláu Spencer
<[email protected]>
Co-authored-by: Grant Palau Spencer <[email protected]>
---
.../metaclient/api/MetaClientCacheInterface.java | 90 ++++++++
.../helix/metaclient/api/MetaClientInterface.java | 15 ++
.../MetaClientCacheConfig.java} | 37 ++--
.../metaclient/factories/MetaClientFactory.java | 28 ++-
.../helix/metaclient/impl/zk/ZkMetaClient.java | 65 +++++-
.../metaclient/impl/zk/ZkMetaClientCache.java | 246 +++++++++++++++++++++
.../impl/zk/factory/ZkMetaClientFactory.java | 15 ++
.../metaclient/impl/zk/util/ZkMetaClientUtil.java | 25 +++
.../metaclient/impl/zk/TestStressZkClient.java | 30 +++
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 56 ++++-
.../metaclient/impl/zk/TestZkMetaClientCache.java | 184 +++++++++++++++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 2 +-
12 files changed, 767 insertions(+), 26 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
new file mode 100644
index 000000000..3630e1c39
--- /dev/null
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
@@ -0,0 +1,90 @@
+package org.apache.helix.metaclient.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {
+
+ /**
+ * TrieNode class to store the children of the entries to be cached.
+ */
+ class TrieNode {
+ // A mapping between trie key and children nodes.
+ private Map<String, TrieNode> _children;
+ // the complete path/prefix leading to the current node.
+ private final String _path;
+ private final String _nodeKey;
+
+ public TrieNode(String path, String nodeKey) {
+ _path = path;
+ _nodeKey = nodeKey;
+ _children = new HashMap<>();
+ }
+
+ public Map<String, TrieNode> getChildren() {
+ return _children;
+ }
+
+ public String getPath() {
+ return _path;
+ }
+
+ public String getNodeKey() {
+ return _nodeKey;
+ }
+
+ public void addChild(String key, TrieNode node) {
+ _children.put(key, node);
+ }
+
+ public TrieNode processPath(String path, boolean isCreate) {
+ String[] pathComponents = path.split("/");
+ TrieNode currentNode = this;
+ TrieNode previousNode = null;
+
+ for (int i = 1; i < pathComponents.length; i++) {
+ String component = pathComponents[i];
+ if (component.equals(_nodeKey)) {
+ // Skip the root node
+ } else if (!currentNode.getChildren().containsKey(component)) {
+ if (isCreate) {
+ TrieNode newNode = new TrieNode(currentNode.getPath()
+ "/" + component, component);
+ currentNode.addChild(component, newNode);
+ previousNode = currentNode;
+ currentNode = newNode;
+ } else {
+ return currentNode;
+ }
+ } else {
+ previousNode = currentNode;
+ currentNode = currentNode.getChildren().get(component);
+ }
+ }
+
+ if (!isCreate && previousNode != null) {
+ previousNode.getChildren().remove(currentNode.getNodeKey());
+ }
+
+ return currentNode;
+ }
+ }
+}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 5b26896a9..5403e7ca4 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -152,6 +152,21 @@ public interface MetaClientInterface<T> {
*/
void create(final String key, final T data, final EntryMode mode);
+ /**
+ * Create an entry of given EntryMode with given key and data. If any parent
node in the node
+ * hierarchy does not exist, then the parent node will attempt to be
created. The entry will not
+ * be created if there is an existing entry with the same full key.
Ephemeral nodes cannot have
+ * children, so only the final child in the created path will be ephemeral.
+ */
+ void recursiveCreate(final String key, final T Data, final EntryMode mode);
+
+ /**
+ * Create a TTL entry with given key, data, and expiry time (ttl). If any
parent node in the node
+ * hierarchy does not exist, then the parent node will attempt to be
created. The entry will not be created if
+ * there is an existing entry with the same full key.
+ */
+ void recursiveCreateWithTTL(String key, T data, long ttl);
+
/**
* Create an entry of given EntryMode with given key, data, and expiry time
(ttl).
* The entry will automatically purge when reached expiry time and has no
children.
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
similarity index 50%
copy from
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
copy to
meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
index 9eba28b91..2c23a0a68 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
@@ -1,4 +1,4 @@
-package org.apache.helix.metaclient.impl.zk.factory;
+package org.apache.helix.metaclient.factories;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,21 +19,28 @@ package org.apache.helix.metaclient.impl.zk.factory;
* under the License.
*/
-import org.apache.helix.metaclient.api.MetaClientInterface;
-import org.apache.helix.metaclient.factories.MetaClientConfig;
-import org.apache.helix.metaclient.factories.MetaClientFactory;
-import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
-public class ZkMetaClientFactory extends MetaClientFactory {
- @Override
- public MetaClientInterface getMetaClient(MetaClientConfig config) {
- if (config == null) {
- throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+
+public class MetaClientCacheConfig {
+ private final String _rootEntry;
+ private final boolean _cacheData;
+ private final boolean _cacheChildren;
+
+ public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean
cacheChildren) {
+ _rootEntry = rootEntry;
+ _cacheData = cacheData;
+ _cacheChildren = cacheChildren;
+ }
+
+ public String getRootEntry() {
+ return _rootEntry;
}
- if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())
- && config instanceof ZkMetaClientConfig) {
- return new ZkMetaClient((ZkMetaClientConfig) config);
+
+ public boolean getCacheData() {
+ return _cacheData;
+ }
+
+ public boolean getCacheChildren() {
+ return _cacheChildren;
}
- throw new IllegalArgumentException("Invalid MetaClientConfig type.");
- }
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
index 045374332..7cc86a8a9 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
@@ -20,6 +20,7 @@ package org.apache.helix.metaclient.factories;
*/
+import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
@@ -39,14 +40,27 @@ public class MetaClientFactory {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
- ZkMetaClientConfig zkMetaClientConfig = new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().
- setConnectionAddress(config.getConnectionAddress())
- .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
-
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
- .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
- .build();
- return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+ return new
ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
}
return null;
}
+
+ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config,
MetaClientCacheConfig cacheConfig) {
+ if (config == null) {
+ throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+ }
+ if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+ return new
ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config),
cacheConfig);
+ }
+ return null;
+ }
+
+ private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config)
{
+ return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
+ setConnectionAddress(config.getConnectionAddress())
+ .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
+
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
+ .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
+ .build();
+ }
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 8753747f3..16d28c6d7 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -41,6 +41,7 @@ import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
@@ -63,6 +64,7 @@ import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.separateIntoUniqueNodePaths;
import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
@@ -104,7 +106,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
}
@Override
- public void create(String key, Object data, MetaClientInterface.EntryMode
mode) {
+ public void create(String key, Object data, EntryMode mode) {
try {
_zkClient.create(key, data,
ZkMetaClientUtil.convertMetaClientMode(mode));
@@ -115,6 +117,67 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
}
}
+ @Override
+ public void recursiveCreate(String key, T data, EntryMode mode) {
+ // Function named recursiveCreate to match naming scheme, but actual work
is iterative
+ iterativeCreate(key, data, mode, -1);
+ }
+
+ @Override
+ public void recursiveCreateWithTTL(String key, T data, long ttl) {
+ iterativeCreate(key, data, EntryMode.TTL, ttl);
+ }
+
+ private void iterativeCreate(String key, T data, EntryMode mode, long ttl) {
+ List<String> nodePaths = separateIntoUniqueNodePaths(key);
+ int i = 0;
+ // Ephemeral nodes cant have children, so change mode when creating parents
+ EntryMode parentMode = (EntryMode.EPHEMERAL.equals(mode) ?
+ EntryMode.PERSISTENT : mode);
+
+ // Iterate over paths, starting with full key then attempting each
successive parent
+ // Try /a/b/c, if parent /a/b, does not exist, then try to create parent,
etc..
+ while (i < nodePaths.size()) {
+ // If parent exists or there is no parent node, then try to create the
node
+ // and break out of loop on successful create
+ if (i == nodePaths.size() - 1 || _zkClient.exists(nodePaths.get(i+1))) {
+ try {
+ if (EntryMode.TTL.equals(mode)) {
+ createWithTTL(nodePaths.get(i), data, ttl);
+ } else {
+ create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
+ }
+ // Race condition may occur where a node is created by another thread
in between loops.
+ // We should not throw error if this occurs for parent nodes, only for
the full node path.
+ } catch (MetaClientNodeExistsException e) {
+ if (i == 0) {
+ throw e;
+ }
+ }
+ break;
+ // Else try to create parent in next loop iteration
+ } else {
+ i++;
+ }
+ }
+
+ // Reattempt creation of children that failed due to parent not existing
+ while (--i >= 0) {
+ try {
+ if (EntryMode.TTL.equals(mode)) {
+ createWithTTL(nodePaths.get(i), data, ttl);
+ } else {
+ create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
+ }
+ // Catch same race condition as above
+ } catch (MetaClientNodeExistsException e) {
+ if (i == 0) {
+ throw e;
+ }
+ }
+ }
+ }
+
@Override
public void createWithTTL(String key, T data, long ttl) {
try {
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
new file mode 100644
index 000000000..45701063c
--- /dev/null
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
@@ -0,0 +1,246 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.ChildChangeListener;
+import org.apache.helix.metaclient.api.MetaClientCacheInterface;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
+import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements
MetaClientCacheInterface<T> {
+
+ private ConcurrentHashMap<String, T> _dataCacheMap;
+ private final String _rootEntry;
+ private TrieNode _childrenCacheTree;
+ private ChildChangeListener _eventListener;
+ private boolean _cacheData;
+ private boolean _cacheChildren;
+ private static final Logger LOG =
LoggerFactory.getLogger(ZkMetaClientCache.class);
+ private ZkClient _cacheClient;
+ private ExecutorService executor;
+
+ // TODO: Look into using conditional variable instead of latch.
+ private final CountDownLatch _initializedCache = new CountDownLatch(1);
+
+ /**
+ * Constructor for ZkMetaClientCache.
+ * @param config ZkMetaClientConfig
+ * @param cacheConfig MetaClientCacheConfig
+ */
+ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig
cacheConfig) {
+ super(config);
+ _cacheClient = getZkClient();
+ _rootEntry = cacheConfig.getRootEntry();
+ _cacheData = cacheConfig.getCacheData();
+ _cacheChildren = cacheConfig.getCacheChildren();
+
+ if (_cacheData) {
+ _dataCacheMap = new ConcurrentHashMap<>();
+ }
+ if (_cacheChildren) {
+ _childrenCacheTree = new TrieNode(_rootEntry,
_rootEntry.substring(1));
+ }
+ }
+
+ /**
+ * Get data for a given key.
+ * If datacache is enabled, will fetch for cache. If it doesn't exist
+ * returns null (for when initial populating cache is in progress).
+ * @param key key to identify the entry
+ * @return data for the key
+ */
+ @Override
+ public T get(final String key) {
+ if (_cacheData) {
+ T data = getDataCacheMap().get(key);
+ if (data == null) {
+ LOG.debug("Data not found in cache for key: {}. This could be
because the cache is still being populated.", key);
+ }
+ return data;
+ }
+ return super.get(key);
+ }
+
+ @Override
+ public List<T> get(List<String> keys) {
+ List<T> dataList = new ArrayList<>();
+ for (String key : keys) {
+ dataList.add(get(key));
+ }
+ return dataList;
+ }
+
+ /**
+ * Get the direct children for a given key.
+ * @param key For metadata storage that has hierarchical key space (e.g.
ZK), the key would be
+ * a parent key,
+ * For metadata storage that has non-hierarchical key space
(e.g. etcd), the key would
+ * be a prefix key.
+ * @return list of direct children or null if key doesn't exist / cache is
not populated yet.
+ */
+ @Override
+ public List<String> getDirectChildrenKeys(final String key) {
+ if (_cacheChildren) {
+ TrieNode node = _childrenCacheTree.processPath(key, true);
+ if (node == null) {
+ LOG.debug("Children not found in cache for key: {}. This could
be because the cache is still being populated.", key);
+ return null;
+ }
+ return List.copyOf(node.getChildren().keySet());
+ }
+ return super.getDirectChildrenKeys(key);
+ }
+
+ /**
+ * Get the number of direct children for a given key.
+ * @param key For metadata storage that has hierarchical key space (e.g.
ZK), the key would be
+ * a parent key,
+ * For metadata storage that has non-hierarchical key space
(e.g. etcd), the key would
+ * be a prefix key.
+ * @return number of direct children or 0 if key doesn't exist / has no
children / cache is not populated yet.
+ */
+ @Override
+ public int countDirectChildren(final String key) {
+ if (_cacheChildren) {
+ TrieNode node = _childrenCacheTree.processPath(key, true);
+ if (node == null) {
+ LOG.debug("Children not found in cache for key: {}. This could
be because the cache is still being populated.", key);
+ return 0;
+ }
+ return node.getChildren().size();
+ }
+ return super.countDirectChildren(key);
+ }
+
+ private void populateAllCache() {
+ // TODO: Concurrently populate children and data cache.
+ if (!_cacheClient.exists(_rootEntry)) {
+ LOG.warn("Root entry: {} does not exist.", _rootEntry);
+ // Let the other threads know that the cache is populated.
+ _initializedCache.countDown();
+ return;
+ }
+
+ Queue<String> queue = new ArrayDeque<>();
+ queue.add(_rootEntry);
+
+ while (!queue.isEmpty()) {
+ String node = queue.poll();
+ if (_cacheData) {
+ T dataRecord = _cacheClient.readData(node, true);
+ _dataCacheMap.put(node, dataRecord);
+ }
+ if (_cacheChildren) {
+ _childrenCacheTree.processPath(node, true);
+ }
+ List<String> childNodes = _cacheClient.getChildren(node);
+ for (String child : childNodes) {
+ queue.add(node + "/" + child); // Add child nodes to the queue
with their full path.
+ }
+ }
+ // Let the other threads know that the cache is populated.
+ _initializedCache.countDown();
+ }
+
+ private class CacheUpdateRunnable implements Runnable {
+ private final String path;
+ private final ChildChangeListener.ChangeType changeType;
+
+ public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType
changeType) {
+ this.path = path;
+ this.changeType = changeType;
+ }
+
+ @Override
+ public void run() {
+ waitForPopulateAllCache();
+ // TODO: HANDLE DEDUP EVENT CHANGES
+ switch (changeType) {
+ case ENTRY_CREATED:
+ _childrenCacheTree.processPath(path, true);
+ modifyDataInCache(path, false);
+ break;
+ case ENTRY_DELETED:
+ _childrenCacheTree.processPath(path, false);
+ modifyDataInCache(path, true);
+ break;
+ case ENTRY_DATA_CHANGE:
+ modifyDataInCache(path, false);
+ break;
+ default:
+ LOG.error("Unknown change type: " + changeType);
+ }
+ }
+ }
+
+ private void waitForPopulateAllCache() {
+ try {
+ _initializedCache.await();
+ } catch (InterruptedException e) {
+ throw new MetaClientException("Interrupted while waiting for cache
to populate.", e);
+ }
+ }
+
+ private void modifyDataInCache(String path, Boolean isDelete) {
+ if (_cacheData) {
+ if (isDelete) {
+ getDataCacheMap().remove(path);
+ } else {
+ T dataRecord = _cacheClient.readData(path, true);
+ getDataCacheMap().put(path, dataRecord);
+ }
+ }
+ }
+
+ public ConcurrentHashMap<String, T> getDataCacheMap() {
+ return _dataCacheMap;
+ }
+
+
+ /**
+ * Connect to the underlying ZkClient.
+ */
+ @Override
+ public void connect() {
+ super.connect();
+ _eventListener = (path, changeType) -> {
+ Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path,
changeType);
+ executor.execute(cacheUpdateRunnable);
+ };
+ executor = Executors.newSingleThreadExecutor();
+ _cacheClient.subscribePersistRecursiveListener(_rootEntry, new
ChildListenerAdapter(_eventListener));
+ populateAllCache();
+ }
+}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
index 9eba28b91..c4018eb2f 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
@@ -19,10 +19,13 @@ package org.apache.helix.metaclient.impl.zk.factory;
* under the License.
*/
+import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.factories.MetaClientFactory;
import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientCache;
public class ZkMetaClientFactory extends MetaClientFactory {
@Override
@@ -36,4 +39,16 @@ public class ZkMetaClientFactory extends MetaClientFactory {
}
throw new IllegalArgumentException("Invalid MetaClientConfig type.");
}
+
+ @Override
+ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config,
MetaClientCacheConfig cacheConfig) {
+ if (config == null) {
+ throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+ }
+ if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())
+ && config instanceof ZkMetaClientConfig) {
+ return new ZkMetaClientCache((ZkMetaClientConfig) config, cacheConfig);
+ }
+ throw new IllegalArgumentException("Invalid MetaClientConfig type.");
+ }
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
index f93e98919..aee0c698a 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.metaclient.impl.zk.util;
*/
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
@@ -358,4 +359,28 @@ public class ZkMetaClientUtil {
return MetaClientException.ReturnCode.DB_USER_ERROR;
}
}
+
+ // Returns null if no parent path
+ public static String getZkParentPath(String path) {
+ int idx = path.lastIndexOf('/');
+ return idx == 0 ? null : path.substring(0, idx);
+ }
+
+ // Splits a path into the paths for each node along the way.
+ // /a/b/c --> /a/b/c, /a/b, /a
+ public static List<String> separateIntoUniqueNodePaths(String path) {
+ if (path == null || "/".equals(path)) {
+ return null;
+ }
+
+ String[] subPath = path.split("/");
+ String[] nodePaths = new String[subPath.length-1];
+ StringBuilder tempPath = new StringBuilder();
+ for (int i = 1; i < subPath.length; i++) {
+ tempPath.append( "/");
+ tempPath.append(subPath[i]);
+ nodePaths[subPath.length - 1 - i] = tempPath.toString();
+ }
+ return Arrays.asList(nodePaths);
+ }
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
index 6f358f0e8..284ce3645 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestStressZkClient.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.metaclient.api.*;
import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.recipes.lock.DataRecordSerializer;
import org.testng.Assert;
@@ -129,6 +130,35 @@ public class TestStressZkClient extends
ZkMetaClientTestBase {
Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
}
+ @Test
+ public void testRecursiveCreate() {
+ final String zkParentKey = "/stressZk_testRecursiveCreate";
+ _zkMetaClient.create(zkParentKey, ENTRY_STRING_VALUE);
+
+ int count = (int) Math.pow(TEST_ITERATION_COUNT, 1/3d);
+ for (int i = 0; i < count; i++) {
+
+ for (int j = 0; j < count; j++) {
+
+ for (int k = 0; k < count; k++) {
+ String key = zkParentKey + "/" + i + "/" + j + "/" + k;
+ _zkMetaClient.recursiveCreate(key, String.valueOf(k), PERSISTENT);
+ Assert.assertEquals(String.valueOf(k), _zkMetaClient.get(key));
+ }
+ }
+ try {
+ _zkMetaClient.recursiveCreate(zkParentKey + "/" + i, "should_fail",
PERSISTENT);
+ Assert.fail("Should have failed due to node existing");
+ } catch (MetaClientNodeExistsException ignoredException) {
+ }
+ }
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+
+ }
+
@Test
public void testGet() {
final String zkParentKey = "/stressZk_testGet";
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 19f21977b..a5da69f2f 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -26,6 +26,8 @@ import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import java.io.StringWriter;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -38,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.NotImplementedException;
-import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
@@ -50,6 +51,7 @@ import org.testng.annotations.Test;
import static
org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE;
import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
+import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.EPHEMERAL;
import static
org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
@@ -97,6 +99,48 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
}
}
+ @Test
+ public void testRecursiveCreate() {
+ final String path = "/Test/ZkMetaClient/_fullPath";
+
+
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ MetaClientInterface.EntryMode mode = EPHEMERAL;
+
+ // Should succeed even if one of the parent nodes exists
+ String extendedPath = "/A" + path;
+ zkMetaClient.create("/A", ENTRY_STRING_VALUE, PERSISTENT);
+ zkMetaClient.recursiveCreate(extendedPath, ENTRY_STRING_VALUE, mode);
+ Assert.assertNotNull(zkMetaClient.exists(extendedPath));
+
+ // Should succeed if no parent nodes exist
+ zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode);
+ Assert.assertNotNull(zkMetaClient.exists(path));
+
Assert.assertEquals(zkMetaClient.getDataAndStat("/Test").getRight().getEntryType(),
PERSISTENT);
+
Assert.assertEquals(zkMetaClient.getDataAndStat(path).getRight().getEntryType(),
mode);
+
+ // Should throw NodeExistsException if child node exists
+ zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode);
+ Assert.fail("Should have failed due to node already created");
+ } catch (MetaClientException e) {
+ Assert.assertEquals(e.getMessage(),
"org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException:
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
NodeExists for /Test/ZkMetaClient/_fullPath");
+ System.out.println(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testRecursiveCreateWithTTL() {
+ final String path = "/Test/ZkMetaClient/_fullPath/withTTL";
+
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ zkMetaClient.recursiveCreateWithTTL(path, ENTRY_STRING_VALUE, 1000);
+ Assert.assertNotNull(zkMetaClient.exists(path));
+ }
+ }
+
@Test
public void testRenewTTL() {
final String key = "/TestZkMetaClient_testRenewTTL_1";
@@ -517,12 +561,20 @@ public class TestZkMetaClient extends
ZkMetaClientTestBase{
throws Exception {
}
};
- zkMetaClient.subscribeDataChange(basePath, listener, false);
+ DirectChildChangeListener cldListener = new DirectChildChangeListener() {
+
+ @Override
+ public void handleDirectChildChange(String key) throws Exception {
+
+ }
+ };
+ zkMetaClient.subscribeDataChange(basePath, listener, true);
zkMetaClient.create(basePath, "");
zkMetaClient.get(basePath);
zkMetaClient.exists(basePath);
zkMetaClient.getDataAndStat(basePath);
zkMetaClient.getDirectChildrenKeys(basePath);
+ zkMetaClient.subscribeDirectChildChange(basePath, cldListener, true);
}
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
new file mode 100644
index 000000000..2950a652e
--- /dev/null
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
@@ -0,0 +1,184 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.MetaClientTestUtil;
+import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.*;
+
+public class TestZkMetaClientCache extends ZkMetaClientTestBase {
+ private static final String DATA_PATH = "/data";
+ private static final String DATA_VALUE = "testData";
+
+ @Test
+ public void testCreateClient() {
+ final String key = "/testCreate";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ // Perform some random non-read operation
+ zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+ }
+ }
+
+ @Test
+ public void testCacheDataUpdates() {
+ final String key = "/testCacheDataUpdates";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, "test");
+ zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+ // Get data for DATA_PATH and cache it
+ String data = zkMetaClientCache.get(key + DATA_PATH);
+ Assert.assertEquals(data,
zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH),
data)), MetaClientTestUtil.WAIT_DURATION));
+
+ // Update data for DATA_PATH
+ String newData = zkMetaClientCache.update(key + DATA_PATH,
currentData -> currentData + "1");
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH),
newData)), MetaClientTestUtil.WAIT_DURATION));
+
+ zkMetaClientCache.delete(key + DATA_PATH);
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+
(Objects.equals(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH),
null)), MetaClientTestUtil.WAIT_DURATION));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testGetDirectChildrenKeys() {
+ final String key = "/testGetDirectChildrenKeys";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+ (zkMetaClientCache.getDirectChildrenKeys(key).size() ==
2), MetaClientTestUtil.WAIT_DURATION));
+
+
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child1"));
+
Assert.assertTrue(zkMetaClientCache.getDirectChildrenKeys(key).contains("child2"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testCountDirectChildren() {
+ final String key = "/testCountDirectChildren";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE);
+ zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE);
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.countDirectChildren(key) == 2),
MetaClientTestUtil.WAIT_DURATION));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testBatchGet() {
+ final String key = "/testBatchGet";
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, "test");
+ zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+
+ ArrayList<String> keys = new ArrayList<>();
+ keys.add(key);
+ keys.add(key + DATA_PATH);
+
+ ArrayList<String> values = new ArrayList<>();
+ values.add("test");
+ values.add(DATA_VALUE);
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.get(keys).equals(values)), MetaClientTestUtil.WAIT_DURATION));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testLargeClusterLoading() {
+ final String key = "/testLargerNodes";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+
+ int numLayers = 4;
+ int numNodesPerLayer = 20;
+
+ // Create the root node
+ zkMetaClient.create(key, "test");
+
+ Queue<String> queue = new LinkedList<>();
+ queue.offer(key);
+
+ for (int layer = 1; layer <= numLayers; layer++) {
+ int nodesAtThisLayer = Math.min(numNodesPerLayer, queue.size()
* numNodesPerLayer);
+
+ for (int i = 0; i < nodesAtThisLayer; i++) {
+ String parentKey = queue.poll();
+ for (int j = 0; j < numNodesPerLayer; j++) {
+ String newNodeKey = parentKey + "/node" + j;
+ zkMetaClient.create(newNodeKey, "test");
+ queue.offer(newNodeKey);
+ }
+ }
+ }
+
+ try (ZkMetaClientCache<String> zkMetaClientCache =
createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+
+ // Assert Checks on a Random Path
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.get(key + "/node4/node1").equals("test")),
MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.countDirectChildren(key) == numNodesPerLayer),
MetaClientTestUtil.WAIT_DURATION));
+ String newData = zkMetaClientCache.update(key +
"/node4/node1", currentData -> currentData + "1");
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> (
zkMetaClientCache.get(key + "/node4/node1").equals(newData)),
MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
+ (zkMetaClientCache.getDirectChildrenKeys(key +
"/node4/node1")
+ .equals(zkMetaClient.getDirectChildrenKeys(key
+ "/node4/node1"))), MetaClientTestUtil.WAIT_DURATION));
+
+ zkMetaClientCache.delete(key + "/node4/node1");
+ Assert.assertTrue(MetaClientTestUtil.verify(() ->
(Objects.equals(zkMetaClientCache.get(key + "/node4/node1"), null)),
MetaClientTestUtil.WAIT_DURATION));
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+ public ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String
rootPath) {
+ ZkMetaClientConfig config =
+ new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+ //.setZkSerializer(new TestStringSerializer())
+ .build();
+ MetaClientCacheConfig cacheConfig = new
MetaClientCacheConfig(rootPath, true, true);
+ return new ZkMetaClientCache<>(config, cacheConfig);
+ }
+}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 2a06158d0..110587e09 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1474,7 +1474,7 @@ public class ZkClient implements Watcher {
}
public boolean exists(final String path) {
- return exists(path, hasChildOrDataListeners(path));
+ return exists(path, (!_usePersistWatcher) &&
hasChildOrDataListeners(path));
}
protected boolean exists(final String path, final boolean watch) {