This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 827ca822d2dff97f936608a033c30e252cedd1f1 Author: Marcos Rico Peng <[email protected]> AuthorDate: Wed Sep 27 18:32:25 2023 -0700 Lattice cache - caching just data implementation (#2619) Lattice cache - caching just data implementation --------- Co-authored-by: mapeng <[email protected]> --- .../metaclient/api/MetaClientCacheInterface.java | 2 +- .../factories/MetaClientCacheConfig.java | 5 - .../metaclient/factories/MetaClientFactory.java | 25 ++-- .../metaclient/impl/zk/ZkMetaClientCache.java | 153 ++++++++++++++++++--- .../metaclient/impl/zk/TestZkMetaClientCache.java | 85 ++++++++++-- 5 files changed, 220 insertions(+), 50 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java index 348bd0929..6449970bb 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java @@ -36,7 +36,7 @@ public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> { private final String _nodeKey; - TrieNode(String path, String nodeKey) { + public TrieNode(String path, String nodeKey) { _path = path; _nodeKey = nodeKey; _children = new HashMap<>(); diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java index 9e0323601..07972945a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java @@ -25,13 +25,11 @@ public class MetaClientCacheConfig { private final String _rootEntry; private boolean _cacheData = false; private boolean _cacheChildren = false; - private boolean _lazyCaching = true; public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) { _rootEntry = rootEntry; _cacheData = cacheData; _cacheChildren = cacheChildren; - _lazyCaching = lazyCaching; } public String getRootEntry() { @@ -46,7 +44,4 @@ public class MetaClientCacheConfig { return _cacheChildren; } - public boolean getLazyCaching() { - return _lazyCaching; - } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java index ebb4549da..7cc86a8a9 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java @@ -40,13 +40,7 @@ public class MetaClientFactory { throw new IllegalArgumentException("MetaClientConfig cannot be null."); } if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { - ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). - setConnectionAddress(config.getConnectionAddress()) - .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) - .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) - .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) - .build(); - return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig); + return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config)); } return null; } @@ -56,14 +50,17 @@ public class MetaClientFactory { throw new IllegalArgumentException("MetaClientConfig cannot be null."); } if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { - ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). - setConnectionAddress(config.getConnectionAddress()) - .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) - .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) - .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) - .build(); - return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig); + return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig); } return null; } + + private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) { + return new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). + setConnectionAddress(config.getConnectionAddress()) + .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) + .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) + .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) + .build(); + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java index af1c9d791..66eb6f91e 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java @@ -21,31 +21,38 @@ package org.apache.helix.metaclient.impl.zk; import org.apache.helix.metaclient.api.ChildChangeListener; import org.apache.helix.metaclient.api.MetaClientCacheInterface; -import org.apache.helix.metaclient.datamodel.DataRecord; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.factories.MetaClientCacheConfig; -import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; -import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; -import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer; import org.apache.helix.zookeeper.zkclient.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Queue; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> { - private Map<String, DataRecord> _dataCacheMap; + private ConcurrentHashMap<String, T> _dataCacheMap; private final String _rootEntry; private TrieNode _childrenCacheTree; private ChildChangeListener _eventListener; private boolean _cacheData; private boolean _cacheChildren; - private boolean _lazyCaching; private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class); private ZkClient _cacheClient; + private ExecutorService executor; + + // TODO: Look into using conditional variable instead of latch. + private final CountDownLatch _initializedCache = new CountDownLatch(1); /** * Constructor for ZkMetaClientCache. @@ -56,19 +63,43 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC super(config); _cacheClient = getZkClient(); _rootEntry = cacheConfig.getRootEntry(); - _lazyCaching = cacheConfig.getLazyCaching(); _cacheData = cacheConfig.getCacheData(); _cacheChildren = cacheConfig.getCacheChildren(); + + if (_cacheData) { + _dataCacheMap = new ConcurrentHashMap<>(); + } + if (_cacheChildren) { + _childrenCacheTree = new TrieNode(_rootEntry, null); + } } + /** + * Get data for a given key. + * If datacache is enabled, will fetch for cache. If it doesn't exist + * returns null (for when initial populating cache is in progress). + * @param key key to identify the entry + * @return data for the key + */ @Override - public Stat exists(String key) { - throw new MetaClientException("Not implemented yet."); + public T get(final String key) { + if (_cacheData) { + T data = getDataCacheMap().get(key); + if (data == null) { + LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key); + } + return data; + } + return super.get(key); } @Override - public T get(final String key) { - throw new MetaClientException("Not implemented yet."); + public List<T> get(List<String> keys) { + List<T> dataList = new ArrayList<>(); + for (String key : keys) { + dataList.add(get(key)); + } + return dataList; } @Override @@ -81,14 +112,100 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC throw new MetaClientException("Not implemented yet."); } - @Override - public List<T> get(List<String> keys) { - throw new MetaClientException("Not implemented yet."); + private void populateAllCache() { + // TODO: Concurrently populate children and data cache. + if (!_cacheClient.exists(_rootEntry)) { + LOG.warn("Root entry: {} does not exist.", _rootEntry); + // Let the other threads know that the cache is populated. + _initializedCache.countDown(); + return; + } + + Queue<String> queue = new ArrayDeque<>(); + queue.add(_rootEntry); + + while (!queue.isEmpty()) { + String node = queue.poll(); + if (_cacheData) { + T dataRecord = _cacheClient.readData(node, true); + _dataCacheMap.put(node, dataRecord); + } + queue.addAll(_cacheClient.getChildren(node)); + } + // Let the other threads know that the cache is populated. + _initializedCache.countDown(); } - @Override - public List<Stat> exists(List<String> keys) { - throw new MetaClientException("Not implemented yet."); + private class CacheUpdateRunnable implements Runnable { + private final String path; + private final ChildChangeListener.ChangeType changeType; + + public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) { + this.path = path; + this.changeType = changeType; + } + + @Override + public void run() { + waitForPopulateAllCache(); + // TODO: HANDLE DEDUP EVENT CHANGES + switch (changeType) { + case ENTRY_CREATED: + // Not implemented yet. + modifyDataInCache(path, false); + break; + case ENTRY_DELETED: + // Not implemented yet. + modifyDataInCache(path, true); + break; + case ENTRY_DATA_CHANGE: + modifyDataInCache(path, false); + break; + default: + LOG.error("Unknown change type: " + changeType); + } + } + } + + private void waitForPopulateAllCache() { + try { + _initializedCache.await(); + } catch (InterruptedException e) { + throw new MetaClientException("Interrupted while waiting for cache to populate.", e); + } + } + + private void modifyDataInCache(String path, Boolean isDelete) { + if (_cacheData) { + if (isDelete) { + getDataCacheMap().remove(path); + } else { + T dataRecord = _cacheClient.readData(path, true); + getDataCacheMap().put(path, dataRecord); + } + } } + public ConcurrentHashMap<String, T> getDataCacheMap() { + return _dataCacheMap; + } + + public TrieNode getChildrenCacheTree() { + return _childrenCacheTree; + } + + /** + * Connect to the underlying ZkClient. + */ + @Override + public void connect() { + super.connect(); + _eventListener = (path, changeType) -> { + Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType); + executor.execute(cacheUpdateRunnable); + }; + executor = Executors.newSingleThreadExecutor(); + _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener)); + populateAllCache(); + } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java index a3a5b4eee..30a0a729d 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java @@ -19,39 +19,100 @@ package org.apache.helix.metaclient.impl.zk; * under the License. */ - import org.apache.helix.metaclient.factories.MetaClientCacheConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; + public class TestZkMetaClientCache extends ZkMetaClientTestBase { - private static final String PATH = "/Cache"; + private static final String DATA_PATH = "/data"; + private static final String DATA_VALUE = "testData"; @Test public void testCreateClient() { - final String key = "/TestZkMetaClientCache_testCreate"; - try (ZkMetaClient<String> zkMetaClientCache = createZkMetaClientCache()) { + final String key = "/testCreate"; + try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { zkMetaClientCache.connect(); // Perform some random non-read operation zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + } + } + + @Test + public void testCacheDataUpdates() { + final String key = "/testCacheDataUpdates"; + try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, "test"); + zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE); + + // Get data for DATA_PATH and cache it + String data = zkMetaClientCache.get(key + DATA_PATH); + Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + + // Update data for DATA_PATH + String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1"); + + // Verify that cached data is updated. Might take some time + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) { + break; + } + Thread.sleep(1000); + } + Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + + zkMetaClientCache.delete(key + DATA_PATH); + // Verify that cached data is updated. Might take some time + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) { + break; + } + Thread.sleep(1000); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testBatchGet() { + final String key = "/testBatchGet"; + try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, "test"); + zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE); + + ArrayList<String> keys = new ArrayList<>(); + keys.add(key); + keys.add(key + DATA_PATH); + + ArrayList<String> values = new ArrayList<>(); + values.add("test"); + values.add(DATA_VALUE); - try { - //Perform some read operation - should fail. - // TODO: Remove this once implemented. - zkMetaClientCache.get(key); - Assert.fail("Should have failed with non implemented yet."); - } catch (Exception ignored) { + for (int i = 0; i < 10; i++) { + // Get data for DATA_PATH and cache it + List<String> data = zkMetaClientCache.get(keys); + if (data.equals(values)) { + break; + } + Thread.sleep(1000); } + } catch (InterruptedException e) { + throw new RuntimeException(e); } } - protected static ZkMetaClientCache<String> createZkMetaClientCache() { + protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) { ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) //.setZkSerializer(new TestStringSerializer()) .build(); - MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true); + MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true); return new ZkMetaClientCache<>(config, cacheConfig); } }
