Repository: curator Updated Branches: refs/heads/persistent-watch 2bbdbfd5f -> fc2219ea9
CachedModeledFramework can use the new CuratorCache Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fc2219ea Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fc2219ea Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fc2219ea Branch: refs/heads/persistent-watch Commit: fc2219ea9dc365cf0226593f9260361407a4ef6f Parents: 2bbdbfd Author: randgalt <[email protected]> Authored: Thu Aug 3 12:20:02 2017 -0500 Committer: randgalt <[email protected]> Committed: Thu Aug 3 12:20:02 2017 -0500 ---------------------------------------------------------------------- .../framework/recipes/watch/CacheAction.java | 5 ++ .../framework/recipes/watch/CacheSelectors.java | 13 +++++ .../recipes/watch/CuratorCacheBuilder.java | 1 + .../recipes/watch/InternalCuratorCache.java | 2 + .../async/modeled/details/ModeledCacheImpl.java | 56 ++++++++++---------- 5 files changed, 50 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java index 57a48f1..8cfca93 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java @@ -38,6 +38,11 @@ public enum CacheAction PATH_ONLY, /** + * The node and its {@link Stat} are stored - in events, however, uncompressed data is sent (but not stored) + */ + UNCOMPRESSED_STAT_ONLY, + + /** * The node and its {@link Stat} are stored */ STAT_ONLY, http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java index db1a8a8..eaf1145 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java @@ -35,6 +35,7 @@ public class CacheSelectors { private static final CacheSelector statAndData = new StandardCacheSelector(CacheAction.STAT_AND_DATA); private static final CacheSelector uncompressedStatAndData = new StandardCacheSelector(CacheAction.STAT_AND_UNCOMPRESSED_DATA); + private static final CacheSelector uncompressedStatOnly = new StandardCacheSelector(CacheAction.UNCOMPRESSED_STAT_ONLY); private static final CacheSelector statOnly = new StandardCacheSelector(CacheAction.STAT_ONLY); private static final CacheSelector pathOnly = new StandardCacheSelector(CacheAction.PATH_ONLY); @@ -156,6 +157,18 @@ public class CacheSelectors /** * Returns a cache selector that stores only the stat and processes the entire tree + * from the root path given to the cache builder. In events, however, uncompressed data is + * sent (but not stored). + * + * @return selector + */ + public static CacheSelector getUncompressedStatOnly() + { + return uncompressedStatOnly; + } + + /** + * Returns a cache selector that stores only the stat and processes the entire tree * from the root path given to the cache builder * * @return selector http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java index 19c27a9..10f8bc2 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java @@ -65,6 +65,7 @@ public class CuratorCacheBuilder if ( singleNodeCacheAction != null ) { Preconditions.checkState(cacheSelector == null, "Single node mode does not support CacheSelectors"); + Preconditions.checkState(singleNodeCacheAction != CacheAction.UNCOMPRESSED_STAT_ONLY, "Single node mode does not support UNCOMPRESSED_STAT_ONLY"); return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart); } http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java index 9d94c34..e2b1bf3 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java @@ -231,6 +231,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher } case STAT_AND_UNCOMPRESSED_DATA: + case UNCOMPRESSED_STAT_ONLY: { try { @@ -289,6 +290,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher } case STAT_ONLY: + case UNCOMPRESSED_STAT_ONLY: { putNode = new CachedNode(newNode.getStat()); break; http://git-wip-us.apache.org/repos/asf/curator/blob/fc2219ea/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 72e6762..ce73a9b 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -21,17 +21,20 @@ package org.apache.curator.x.async.modeled.details; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.framework.recipes.watch.CacheEvent; +import org.apache.curator.framework.recipes.watch.CacheListener; +import org.apache.curator.framework.recipes.watch.CacheSelectors; +import org.apache.curator.framework.recipes.watch.CachedNode; +import org.apache.curator.framework.recipes.watch.CuratorCache; +import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.ModeledCache; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import org.apache.curator.x.async.modeled.ZNode; import org.apache.zookeeper.data.Stat; import java.util.AbstractMap; import java.util.Map; @@ -40,9 +43,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; -class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> +class ModeledCacheImpl<T> implements CacheListener, ModeledCache<T> { - private final TreeCache cache; + private final CuratorCache cache; private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>(); private final ModelSerializer<T> serializer; private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>(); @@ -69,11 +72,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> basePath = modelSpec.path(); this.serializer = modelSpec.serializer(); - cache = TreeCache.newBuilder(client, basePath.fullPath()) - .setCacheData(false) - .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) - .setExecutor(executor) - .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) + boolean dataIsCompressed = modelSpec.createOptions().contains(CreateOption.compress); + cache = CuratorCacheBuilder.builder(client, basePath.fullPath()) + .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.getUncompressedStatOnly()) + .sendingRefreshEvents(true) .build(); } @@ -134,11 +136,11 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> } @Override - public void childEvent(CuratorFramework client, TreeCacheEvent event) + public void process(CacheEvent event, String path, CachedNode affectedNode) { try { - internalChildEvent(event); + internalChildEvent(event, path, affectedNode); } catch ( Exception e ) { @@ -151,42 +153,42 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> } } - private void internalChildEvent(TreeCacheEvent event) throws Exception + private void internalChildEvent(CacheEvent event, String pathStr, CachedNode affectedNode) throws Exception { - switch ( event.getType() ) + switch ( event ) { - case NODE_ADDED: - case NODE_UPDATED: + case NODE_CREATED: + case NODE_CHANGED: { - ZPath path = ZPath.parse(event.getData().getPath()); + ZPath path = ZPath.parse(pathStr); if ( !path.equals(basePath) ) { - byte[] bytes = event.getData().getData(); + byte[] bytes = affectedNode.getData(); if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created { T model = serializer.deserialize(bytes); - entries.put(path, new Entry<>(event.getData().getStat(), model)); - ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; - accept(type, path, event.getData().getStat(), model); + entries.put(path, new Entry<>(affectedNode.getStat(), model)); + ModeledCacheListener.Type type = (event == CacheEvent.NODE_CREATED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; + accept(type, path, affectedNode.getStat(), model); } } break; } - case NODE_REMOVED: + case NODE_DELETED: { - ZPath path = ZPath.parse(event.getData().getPath()); + ZPath path = ZPath.parse(pathStr); if ( !path.equals(basePath) ) { Entry<T> entry = entries.remove(path); - T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); - Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); + T model = (entry != null) ? entry.model : serializer.deserialize(affectedNode.getData()); + Stat stat = (entry != null) ? entry.stat : affectedNode.getStat(); accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model); } break; } - case INITIALIZED: + case CACHE_REFRESHED: { listenerContainer.forEach(l -> { l.initialized();
