Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 55df07e6a -> 93f11ed09


1. remove siblings added childrenAsZNodes
2. Add util to unwrap stages of ZNodes


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/93f11ed0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/93f11ed0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/93f11ed0

Branch: refs/heads/CURATOR-397
Commit: 93f11ed09e0cc6e9c6b80b1a88796d027f4f3d7a
Parents: 55df07e
Author: randgalt <[email protected]>
Authored: Sat Jun 10 19:52:15 2017 -0500
Committer: randgalt <[email protected]>
Committed: Sat Jun 10 19:52:15 2017 -0500

----------------------------------------------------------------------
 .../x/async/modeled/ModeledFramework.java       |  9 +++--
 .../apache/curator/x/async/modeled/ZNode.java   | 27 +++++++++++++-
 .../modeled/cached/CachedModeledFramework.java  |  9 +++++
 .../details/CachedModeledFrameworkImpl.java     | 22 ++++++++----
 .../x/async/modeled/details/ModelStage.java     | 10 ++++++
 .../async/modeled/details/ModeledCacheImpl.java |  5 +++
 .../modeled/details/ModeledFrameworkImpl.java   | 38 ++++++++++++++++++--
 7 files changed, 108 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
index d9ce43d..a53d22b 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java
@@ -280,12 +280,17 @@ public interface ModeledFramework<T>
     AsyncStage<List<ZPath>> children();
 
     /**
-     * Return the child paths of this instance's parent path (in no particular 
order)
+     * Return the child paths of this instance's path (in no particular order)
+     * and deserialize into a models. IMPORTANT: this results in a ZooKeeper 
query
+     * for each child node returned. i.e. if the initial children() call 
returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data. 
Note:
+     * cannot be used if any of the {@link ModeledFrameworkBuilder#watched()} 
modes
+     * are used.
      *
      * @return AsyncStage
      * @see org.apache.curator.x.async.AsyncStage
      */
-    AsyncStage<List<ZPath>> siblings();
+    AsyncStage<List<ZNode<T>>> childrenAsZNodes();
 
     /**
      * Create operation instance that can be passed among other operations to

http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
index 7ed6ef5..0d34d82 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java
@@ -18,8 +18,11 @@
  */
 package org.apache.curator.x.async.modeled;
 
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.data.Stat;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
 
 /**
  * Abstracts a ZooKeeper node
@@ -46,4 +49,26 @@ public interface ZNode<T>
      * @return model
      */
     T model();
+
+    /**
+     * Utility that modifies an async stage of znodes into an async stage of 
models
+     *
+     * @param from original stage
+     * @return stage of models
+     */
+    static <T> CompletionStage<List<T>> models(AsyncStage<List<ZNode<T>>> from)
+    {
+        return from.thenApply(nodes -> 
nodes.stream().map(ZNode::model).collect(Collectors.toList()));
+    }
+
+    /**
+     * Utility that modifies an async stage of a znode into an async stage of 
a model
+     *
+     * @param from original stage
+     * @return stage of a model
+     */
+    static <T> CompletionStage<T> model(AsyncStage<ZNode<T>> from)
+    {
+        return from.thenApply(ZNode::model);
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
index 8ab5e8b..8acbebb 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
@@ -65,6 +65,15 @@ public interface CachedModeledFramework<T> extends 
ModeledFramework<T>, Closeabl
     Listenable<ModeledCacheListener<T>> listenable();
 
     /**
+     * Same as {@link 
org.apache.curator.x.async.modeled.ModeledFramework#childrenAsZNodes()}
+     * but always reads from cache - i.e. no additional queries to ZooKeeper 
are made
+     *
+     * @return AsyncStage stage
+     */
+    @Override
+    AsyncStage<List<ZNode<T>>> childrenAsZNodes();
+
+    /**
      * {@inheritDoc}
      */
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 4884af4..2a7fd5f 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.async.modeled.details;
 
-import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
 import org.apache.curator.framework.listen.Listenable;
@@ -36,8 +35,8 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
@@ -254,15 +253,24 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
     @Override
     public AsyncStage<List<ZPath>> children()
     {
-        Set<ZPath> paths = 
cache.currentChildren(client.modelSpec().path()).keySet();
-        return completed(Lists.newArrayList(paths));
+        List<ZPath> paths = cache.currentChildren(client.modelSpec().path())
+            .keySet()
+            .stream()
+            .filter(path -> path.equals(cache.basePath()))
+            .collect(Collectors.toList());
+        return completed(paths);
     }
 
     @Override
-    public AsyncStage<List<ZPath>> siblings()
+    public AsyncStage<List<ZNode<T>>> childrenAsZNodes()
     {
-        Set<ZPath> paths = 
cache.currentChildren(client.modelSpec().path().parent()).keySet();
-        return completed(Lists.newArrayList(paths));
+        List<ZNode<T>> nodes = cache.currentChildren(client.modelSpec().path())
+            .entrySet()
+            .stream()
+            .filter(e -> e.getKey().equals(cache.basePath()))
+            .map(Map.Entry::getValue)
+            .collect(Collectors.toList());
+        return completed(nodes);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
index dfeb5d1..27047ec 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -32,6 +32,11 @@ class ModelStage<T> extends CompletableFuture<T> implements 
AsyncStage<T>
 {
     private final CompletionStage<WatchedEvent> event;
 
+    static <U> ModelStage<U> make()
+    {
+        return new ModelStage<>(null);
+    }
+
     static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event)
     {
         return new ModelStage<>(event);
@@ -51,6 +56,11 @@ class ModelStage<T> extends CompletableFuture<T> implements 
AsyncStage<T>
         return stage;
     }
 
+    static <U> ModelStage<U> async(Executor executor)
+    {
+        return new AsyncModelStage<>(executor);
+    }
+
     static <U> ModelStage<U> asyncCompleted(U value, Executor executor)
     {
         ModelStage<U> stage = new AsyncModelStage<>(executor);

http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index 466c5e9..4c281aa 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -108,6 +108,11 @@ class ModeledCacheImpl<T> implements TreeCacheListener, 
ModeledCache<T>
         return Optional.empty();
     }
 
+    ZPath basePath()
+    {
+        return basePath;
+    }
+
     Map<ZPath, ZNode<T>> currentChildren()
     {
         return currentChildren(basePath);

http://git-wip-us.apache.org/repos/asf/curator/blob/93f11ed0/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 453112d..422ae61 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -19,6 +19,7 @@
 package org.apache.curator.x.async.modeled.details;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -229,9 +230,42 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
     }
 
     @Override
-    public AsyncStage<List<ZPath>> siblings()
+    public AsyncStage<List<ZNode<T>>> childrenAsZNodes()
     {
-        return internalGetChildren(modelSpec.path().parent());
+        ModelStage<List<ZNode<T>>> modelStage = ModelStage.make();
+        Preconditions.checkState(!isWatched, "childrenAsZNodes() cannot be 
used with watched instances.");
+        children().handle((children, e) -> {
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                completeChildrenAsZNodes(modelStage, children);
+            }
+            return null;
+        });
+        return modelStage;
+    }
+
+    private void completeChildrenAsZNodes(ModelStage<List<ZNode<T>>> 
modelStage, List<ZPath> children)
+    {
+        List<ZNode<T>> nodes = Lists.newArrayList();
+        children.forEach(name -> child(name).readAsZNode().handle((node, e) -> 
{
+            if ( e != null )
+            {
+                modelStage.completeExceptionally(e);
+            }
+            else
+            {
+                nodes.add(node);
+                if ( nodes.size() == children.size() )
+                {
+                    modelStage.complete(nodes);
+                }
+            }
+            return null;
+        }));
     }
 
     private AsyncStage<List<ZPath>> internalGetChildren(ZPath path)

Reply via email to