Repository: curator Updated Branches: refs/heads/CURATOR-397 a4636098f -> 16fb7b18b
Added new method of integrated caching. Needs testing, etc. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/16fb7b18 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/16fb7b18 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/16fb7b18 Branch: refs/heads/CURATOR-397 Commit: 16fb7b18b77153578dd489d0e1509a11eff0c6f5 Parents: a463609 Author: randgalt <[email protected]> Authored: Sun Apr 30 15:36:18 2017 -0500 Committer: randgalt <[email protected]> Committed: Sun Apr 30 15:36:18 2017 -0500 ---------------------------------------------------------------------- .../modeled/CachedModeledCuratorFramework.java | 25 +++ .../async/modeled/ModeledCuratorFramework.java | 28 +++ .../CachedModeledCuratorFrameworkImpl.java | 223 +++++++++++++++++++ .../details/ModeledCuratorFrameworkImpl.java | 43 ++++ .../recipes/ModeledPathChildrenCacheImpl.java | 6 +- .../x/async/modeled/recipes/ModeledCache.java | 17 ++ .../recipes/ModeledPathChildrenCache.java | 14 +- .../async/modeled/recipes/ModeledTreeCache.java | 13 +- 8 files changed, 342 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/CachedModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/CachedModeledCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/CachedModeledCuratorFramework.java new file mode 100644 index 0000000..05ce936 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/CachedModeledCuratorFramework.java @@ -0,0 +1,25 @@ +package org.apache.curator.x.async.modeled; + +import org.apache.curator.x.async.modeled.recipes.ModeledCache; +import java.io.Closeable; + +public interface CachedModeledCuratorFramework<T> extends ModeledCuratorFramework<T>, Closeable +{ + /** + * Return the cache instance + * + * @return cache + */ + ModeledCache<T> getCache(); + + /** + * Start the internally created via {@link #cached()} + */ + void start(); + + /** + * Close the internally created via {@link #cached()} + */ + @Override + void close(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java index f4def2b..2a214b7 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.modeled.recipes.ModeledCache; import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.Map; @@ -53,6 +54,26 @@ public interface ModeledCuratorFramework<T> } /** + * Use the given cache as a front for this modeled instance. All read APIs check the cache + * first and, if available, return the values from the cache. Note: you must call + * {@link org.apache.curator.x.async.modeled.CachedModeledCuratorFramework#start()} and + * {@link CachedModeledCuratorFramework#close()} to start/stop + * the cache + * + * @param cache cache to use + * @return wrapped instance + */ + CachedModeledCuratorFramework<T> cached(ModeledCache<T> cache); + + /** + * Use the an internally created cache as a front for this modeled instance. All read APIs check the cache + * first and, if available, return the values from the cache + * + * @return wrapped instance + */ + CachedModeledCuratorFramework<T> cached(); + + /** * Returns the client that was originally passed to {@link #wrap(org.apache.curator.framework.CuratorFramework, CuratorModelSpec)} or * the builder. * @@ -61,6 +82,13 @@ public interface ModeledCuratorFramework<T> CuratorFramework unwrap(); /** + * Return the model being used + * + * @return model + */ + CuratorModelSpec<T> modelSpec(); + + /** * Return a new Modeled Curator instance with all the same options but applying to the given child node of this Modeled Curator's * path. E.g. if this Modeled Curator instance applies to "/a/b", calling <code>modeled.at("c")</code> returns an instance that applies to * "/a/b/c". http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledCuratorFrameworkImpl.java new file mode 100644 index 0000000..8af3e36 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledCuratorFrameworkImpl.java @@ -0,0 +1,223 @@ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.modeled.CachedModeledCuratorFramework; +import org.apache.curator.x.async.modeled.CuratorModelSpec; +import org.apache.curator.x.async.modeled.ModeledCuratorFramework; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.recipes.ModeledCache; +import org.apache.curator.x.async.modeled.recipes.ModeledCachedNode; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +class CachedModeledCuratorFrameworkImpl<T> implements CachedModeledCuratorFramework<T> +{ + private final ModeledCuratorFramework<T> client; + private final ModeledCache<T> cache; + private final ZPath path; + + CachedModeledCuratorFrameworkImpl(ModeledCuratorFramework<T> client, ModeledCache<T> cache, ZPath path) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.cache = Objects.requireNonNull(cache, "cache cannot be null"); + this.path = Objects.requireNonNull(path, "path cannot be null"); + } + + @Override + public ModeledCache<T> getCache() + { + return cache; + } + + @Override + public void start() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + throw new UnsupportedOperationException(); + } + + @Override + public CuratorModelSpec<T> modelSpec() + { + return client.modelSpec(); + } + + @Override + public CachedModeledCuratorFramework<T> cached(ModeledCache<T> cache) + { + throw new UnsupportedOperationException(); + } + + @Override + public CachedModeledCuratorFramework<T> cached() + { + throw new UnsupportedOperationException(); + } + + @Override + public CuratorFramework unwrap() + { + return client.unwrap(); + } + + @Override + public ModeledCuratorFramework<T> at(String child) + { + return new CachedModeledCuratorFrameworkImpl<>(client.at(child), cache, path.at(child)); + } + + @Override + public AsyncStage<String> set(T model) + { + return client.set(model); + } + + @Override + public AsyncStage<String> set(T model, Stat storingStatIn) + { + return client.set(model, storingStatIn); + } + + @Override + public AsyncStage<T> read() + { + return read(null); + } + + @Override + public AsyncStage<T> read(Stat storingStatIn) + { + Optional<ModeledCachedNode<T>> data = cache.getCurrentData(path); + if ( data.isPresent() ) + { + ModeledCachedNode<T> localData = data.get(); + T model = localData.getModel(); + if ( model != null ) + { + if ( (storingStatIn != null) && (localData.getStat() != null) ) + { + DataTree.copyStat(localData.getStat(), storingStatIn); + } + return new ModelStage<>(model); + } + } + return (storingStatIn != null) ? client.read(storingStatIn) : client.read(); + } + + @Override + public AsyncStage<Stat> update(T model) + { + return client.update(model); + } + + @Override + public AsyncStage<Stat> update(T model, int version) + { + return client.update(model, version); + } + + @Override + public AsyncStage<Void> delete() + { + return client.delete(); + } + + @Override + public AsyncStage<Void> delete(int version) + { + return client.delete(version); + } + + @Override + public AsyncStage<Stat> checkExists() + { + Optional<ModeledCachedNode<T>> data = cache.getCurrentData(path); + return data.map(node -> (AsyncStage<Stat>)new ModelStage<>(node.getStat())).orElseGet(client::checkExists); + } + + @Override + public AsyncStage<List<ZPath>> getChildren() + { + return client.getChildren(); + } + + @Override + public AsyncStage<Map<ZPath, AsyncStage<T>>> readChildren() + { + ModelStage<Map<ZPath, AsyncStage<T>>> modelStage = new ModelStage<>(); + client.getChildren().whenComplete((children, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + Map<ZPath, AsyncStage<T>> map = children.stream().collect(Collectors.toMap(Function.identity(), path1 -> at(path1.nodeName()).read())); + modelStage.complete(map); + } + }); + return modelStage; + } + + @Override + public CuratorOp createOp(T model) + { + return client.createOp(model); + } + + @Override + public CuratorOp updateOp(T model) + { + return client.updateOp(model); + } + + @Override + public CuratorOp updateOp(T model, int version) + { + return client.updateOp(model, version); + } + + @Override + public CuratorOp deleteOp() + { + return client.deleteOp(); + } + + @Override + public CuratorOp deleteOp(int version) + { + return client.deleteOp(version); + } + + @Override + public CuratorOp checkExistsOp() + { + return client.checkExistsOp(); + } + + @Override + public CuratorOp checkExistsOp(int version) + { + return client.checkExistsOp(version); + } + + @Override + public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations) + { + return client.inTransaction(operations); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java index c68f693..c7ab3fc 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; @@ -33,9 +34,12 @@ import org.apache.curator.x.async.api.AsyncPathable; import org.apache.curator.x.async.api.AsyncTransactionSetDataBuilder; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; +import org.apache.curator.x.async.modeled.CachedModeledCuratorFramework; import org.apache.curator.x.async.modeled.CuratorModelSpec; import org.apache.curator.x.async.modeled.ModeledCuratorFramework; import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.recipes.ModeledCache; +import org.apache.curator.x.async.modeled.recipes.ModeledTreeCache; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -96,6 +100,45 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } @Override + public CachedModeledCuratorFramework<T> cached(ModeledCache<T> cache) + { + return new CachedModeledCuratorFrameworkImpl<>(this, cache, modelSpec.path()); + } + + @Override + public CachedModeledCuratorFramework<T> cached() + { + TreeCache.Builder builder = TreeCache.newBuilder(client.unwrap(), modelSpec.path().fullPath()); + builder = builder.setCacheData(true); + if ( modelSpec.createOptions().contains(CreateOption.compress) ) + { + builder = builder.setDataIsCompressed(true); + } + TreeCache cache = builder.build(); + ModeledTreeCache<T> wrapped = ModeledTreeCache.wrap(cache, modelSpec.serializer()); + return new CachedModeledCuratorFrameworkImpl<T>(this, wrapped, modelSpec.path()) + { + @Override + public void start() + { + wrapped.start(); + } + + @Override + public void close() + { + wrapped.close(); + } + }; + } + + @Override + public CuratorModelSpec<T> modelSpec() + { + return modelSpec; + } + + @Override public CuratorFramework unwrap() { return client.unwrap(); http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java index b04cf02..a82737e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/recipes/ModeledPathChildrenCacheImpl.java @@ -161,7 +161,7 @@ public class ModeledPathChildrenCacheImpl<T> implements ModeledPathChildrenCache } @Override - public List<ModeledCachedNode> getCurrentData() + public List<ModeledCachedNode<T>> getCurrentData() { return cache.getCurrentData().stream() .map(data -> from(serializer, data)) @@ -169,9 +169,9 @@ public class ModeledPathChildrenCacheImpl<T> implements ModeledPathChildrenCache } @Override - public Optional<ModeledCachedNode> getCurrentData(String fullPath) + public Optional<ModeledCachedNode<T>> getCurrentData(ZPath fullPath) { - return Optional.ofNullable(from(serializer, cache.getCurrentData(fullPath))); + return Optional.ofNullable(from(serializer, cache.getCurrentData(fullPath.fullPath()))); } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCache.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCache.java new file mode 100644 index 0000000..54e4102 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledCache.java @@ -0,0 +1,17 @@ +package org.apache.curator.x.async.modeled.recipes; + +import org.apache.curator.x.async.modeled.ZPath; +import java.util.Optional; + +public interface ModeledCache<T> +{ + /** + * Return the modeled current data for the given path. There are no guarantees of accuracy. This is + * merely the most recent view of the data. If there is no node at the given path, + * {@link java.util.Optional#empty()} is returned. + * + * @param fullPath full path to the node to check + * @return data if the node is alive, or null + */ + Optional<ModeledCachedNode<T>> getCurrentData(ZPath fullPath); +} http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledPathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledPathChildrenCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledPathChildrenCache.java index 32ed58f..7b54fb7 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledPathChildrenCache.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledPathChildrenCache.java @@ -31,7 +31,7 @@ import java.util.Optional; * Wraps a {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so that * node data can be viewed as strongly typed models. */ -public interface ModeledPathChildrenCache<T> extends Closeable +public interface ModeledPathChildrenCache<T> extends ModeledCache<T>, Closeable { /** * Return a newly wrapped cache @@ -85,17 +85,7 @@ public interface ModeledPathChildrenCache<T> extends Closeable * * @return list of children and data */ - List<ModeledCachedNode> getCurrentData(); - - /** - * Return the modeled current data for the given path. There are no guarantees of accuracy. This is - * merely the most recent view of the data. If there is no child with that path - * {@link java.util.Optional#empty()} is returned. - * - * @param fullPath full path to the node to check - * @return data or null - */ - Optional<ModeledCachedNode> getCurrentData(String fullPath); + List<ModeledCachedNode<T>> getCurrentData(); /** * Forwards to {@link org.apache.curator.framework.recipes.cache.PathChildrenCache#clearDataBytes(String)} http://git-wip-us.apache.org/repos/asf/curator/blob/16fb7b18/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledTreeCache.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledTreeCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledTreeCache.java index c7a32c2..a481a0d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledTreeCache.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/recipes/ModeledTreeCache.java @@ -25,13 +25,12 @@ import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.details.recipes.ModeledTreeCacheImpl; import java.io.Closeable; import java.util.Map; -import java.util.Optional; /** * Wraps a {@link org.apache.curator.framework.recipes.cache.TreeCache} so that * node data can be viewed as strongly typed models. */ -public interface ModeledTreeCache<T> extends Closeable +public interface ModeledTreeCache<T> extends ModeledCache<T>, Closeable { /** * Return a newly wrapped cache @@ -77,14 +76,4 @@ public interface ModeledTreeCache<T> extends Closeable * @return a possibly-empty list of children if the node is alive, or null */ Map<ZPath, ModeledCachedNode<T>> getCurrentChildren(ZPath fullPath); - - /** - * Return the modeled current data for the given path. There are no guarantees of accuracy. This is - * merely the most recent view of the data. If there is no node at the given path, - * {@link java.util.Optional#empty()} is returned. - * - * @param fullPath full path to the node to check - * @return data if the node is alive, or null - */ - Optional<ModeledCachedNode<T>> getCurrentData(ZPath fullPath); }
