Repository: curator Updated Branches: refs/heads/CURATOR-397 55df07e6a -> 93f11ed09
1. remove siblings added childrenAsZNodes 2. Add util to unwrap stages of ZNodes Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/93f11ed0 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/93f11ed0 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/93f11ed0 Branch: refs/heads/CURATOR-397 Commit: 93f11ed09e0cc6e9c6b80b1a88796d027f4f3d7a Parents: 55df07e Author: randgalt <[email protected]> Authored: Sat Jun 10 19:52:15 2017 -0500 Committer: randgalt <[email protected]> Committed: Sat Jun 10 19:52:15 2017 -0500 ---------------------------------------------------------------------- .../x/async/modeled/ModeledFramework.java | 9 +++-- .../apache/curator/x/async/modeled/ZNode.java | 27 +++++++++++++- .../modeled/cached/CachedModeledFramework.java | 9 +++++ .../details/CachedModeledFrameworkImpl.java | 22 ++++++++---- .../x/async/modeled/details/ModelStage.java | 10 ++++++ .../async/modeled/details/ModeledCacheImpl.java | 5 +++ .../modeled/details/ModeledFrameworkImpl.java | 38 ++++++++++++++++++-- 7 files changed, 108 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java index d9ce43d..a53d22b 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java @@ -280,12 +280,17 @@ public interface ModeledFramework<T> AsyncStage<List<ZPath>> children(); /** - * Return the child paths of this instance's parent path (in no particular order) + * Return the child paths of this instance's path (in no particular order) + * and deserialize into a models. IMPORTANT: this results in a ZooKeeper query + * for each child node returned. i.e. if the initial children() call returns + * 10 nodes an additional 10 ZooKeeper queries are made to get the data. Note: + * cannot be used if any of the {@link ModeledFrameworkBuilder#watched()} modes + * are used. * * @return AsyncStage * @see org.apache.curator.x.async.AsyncStage */ - AsyncStage<List<ZPath>> siblings(); + AsyncStage<List<ZNode<T>>> childrenAsZNodes(); /** * Create operation instance that can be passed among other operations to http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java index 7ed6ef5..0d34d82 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java @@ -18,8 +18,11 @@ */ package org.apache.curator.x.async.modeled; -import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.data.Stat; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; /** * Abstracts a ZooKeeper node @@ -46,4 +49,26 @@ public interface ZNode<T> * @return model */ T model(); + + /** + * Utility that modifies an async stage of znodes into an async stage of models + * + * @param from original stage + * @return stage of models + */ + static <T> CompletionStage<List<T>> models(AsyncStage<List<ZNode<T>>> from) + { + return from.thenApply(nodes -> nodes.stream().map(ZNode::model).collect(Collectors.toList())); + } + + /** + * Utility that modifies an async stage of a znode into an async stage of a model + * + * @param from original stage + * @return stage of a model + */ + static <T> CompletionStage<T> model(AsyncStage<ZNode<T>> from) + { + return from.thenApply(ZNode::model); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java index 8ab5e8b..8acbebb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java @@ -65,6 +65,15 @@ public interface CachedModeledFramework<T> extends ModeledFramework<T>, Closeabl Listenable<ModeledCacheListener<T>> listenable(); /** + * Same as {@link org.apache.curator.x.async.modeled.ModeledFramework#childrenAsZNodes()} + * but always reads from cache - i.e. no additional queries to ZooKeeper are made + * + * @return AsyncStage stage + */ + @Override + AsyncStage<List<ZNode<T>>> childrenAsZNodes(); + + /** * {@inheritDoc} */ @Override http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 4884af4..2a7fd5f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.async.modeled.details; -import com.google.common.collect.Lists; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.framework.listen.Listenable; @@ -36,8 +35,8 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -254,15 +253,24 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public AsyncStage<List<ZPath>> children() { - Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet(); - return completed(Lists.newArrayList(paths)); + List<ZPath> paths = cache.currentChildren(client.modelSpec().path()) + .keySet() + .stream() + .filter(path -> path.equals(cache.basePath())) + .collect(Collectors.toList()); + return completed(paths); } @Override - public AsyncStage<List<ZPath>> siblings() + public AsyncStage<List<ZNode<T>>> childrenAsZNodes() { - Set<ZPath> paths = cache.currentChildren(client.modelSpec().path().parent()).keySet(); - return completed(Lists.newArrayList(paths)); + List<ZNode<T>> nodes = cache.currentChildren(client.modelSpec().path()) + .entrySet() + .stream() + .filter(e -> e.getKey().equals(cache.basePath())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + return completed(nodes); } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java index dfeb5d1..27047ec 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java @@ -32,6 +32,11 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> { private final CompletionStage<WatchedEvent> event; + static <U> ModelStage<U> make() + { + return new ModelStage<>(null); + } + static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event) { return new ModelStage<>(event); @@ -51,6 +56,11 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> return stage; } + static <U> ModelStage<U> async(Executor executor) + { + return new AsyncModelStage<>(executor); + } + static <U> ModelStage<U> asyncCompleted(U value, Executor executor) { ModelStage<U> stage = new AsyncModelStage<>(executor); http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 466c5e9..4c281aa 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -108,6 +108,11 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> return Optional.empty(); } + ZPath basePath() + { + return basePath; + } + Map<ZPath, ZNode<T>> currentChildren() { return currentChildren(basePath); http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 453112d..422ae61 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -19,6 +19,7 @@ package org.apache.curator.x.async.modeled.details; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.transaction.CuratorOp; @@ -229,9 +230,42 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> } @Override - public AsyncStage<List<ZPath>> siblings() + public AsyncStage<List<ZNode<T>>> childrenAsZNodes() { - return internalGetChildren(modelSpec.path().parent()); + ModelStage<List<ZNode<T>>> modelStage = ModelStage.make(); + Preconditions.checkState(!isWatched, "childrenAsZNodes() cannot be used with watched instances."); + children().handle((children, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + completeChildrenAsZNodes(modelStage, children); + } + return null; + }); + return modelStage; + } + + private void completeChildrenAsZNodes(ModelStage<List<ZNode<T>>> modelStage, List<ZPath> children) + { + List<ZNode<T>> nodes = Lists.newArrayList(); + children.forEach(name -> child(name).readAsZNode().handle((node, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + nodes.add(node); + if ( nodes.size() == children.size() ) + { + modelStage.complete(nodes); + } + } + return null; + })); } private AsyncStage<List<ZPath>> internalGetChildren(ZPath path)
