Repository: curator Updated Branches: refs/heads/master 1b6216e97 -> 123f2ece5
Added asyncEnsureParents() Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/123f2ece Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/123f2ece Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/123f2ece Branch: refs/heads/master Commit: 123f2ece539f924705945f462030ec2b9692aebd Parents: 1b6216e Author: randgalt <[email protected]> Authored: Sat Jul 15 11:03:23 2017 -0500 Committer: randgalt <[email protected]> Committed: Sat Jul 15 11:03:23 2017 -0500 ---------------------------------------------------------------------- .../apache/curator/x/async/AsyncWrappers.java | 132 +++++++++++++++++-- 1 file changed, 120 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/123f2ece/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java index 9630985..f26b3b4 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java @@ -18,13 +18,16 @@ */ package org.apache.curator.x.async; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.api.ExistsOption; -import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.KeeperException; import java.util.Collections; -import java.util.Set; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -70,8 +73,79 @@ import java.util.concurrent.TimeUnit; public class AsyncWrappers { /** - * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using - * the given executor + * <p> + * Return the children of the given path (keyed by the full path) and the data for each node. + * IMPORTANT: this results in a ZooKeeper query + * for each child node returned. i.e. if the initial children() call returns + * 10 nodes an additional 10 ZooKeeper queries are made to get the data. + * </p> + * + * <p> + * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException} + * is <strong>NOT</strong> set. Instead the stage is completed with an empty map. + * </p> + * + * @return CompletionStage + */ + public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path) + { + return childrenWithData(client, path, false); + } + + /** + * <p> + * Return the children of the given path (keyed by the full path) and the data for each node. + * IMPORTANT: this results in a ZooKeeper query + * for each child node returned. i.e. if the initial children() call returns + * 10 nodes an additional 10 ZooKeeper queries are made to get the data. + * </p> + * + * <p> + * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException} + * is <strong>NOT</strong> set. Instead the stage is completed with an empty map. + * </p> + * + * @param isCompressed pass true if data is compressed + * @return CompletionStage + */ + public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed) + { + CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>(); + client.getChildren().forPath(path).handle((children, e) -> { + if ( e != null ) + { + if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException ) + { + future.complete(Maps.newHashMap()); + } + else + { + future.completeExceptionally(e); + } + } + else + { + completeChildren(client, future, path, children, isCompressed); + } + return null; + }); + return future; + } + + /** + * Asynchronously ensure that the parents of the given path are created + * + * @param client client + * @param path path to ensure + * @return stage + */ + public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path) + { + return ensure(client, path, ExistsOption.createParentsIfNeeded); + } + + /** + * Asynchronously ensure that the parents of the given path are created as containers * * @param client client * @param path path to ensure @@ -79,14 +153,7 @@ public class AsyncWrappers */ public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path) { - String localPath = ZKPaths.makePath(path, "foo"); - Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers); - return client - .checkExists() - .withOptions(options) - .forPath(localPath) - .thenApply(__ -> null) - ; + return ensure(client, path, ExistsOption.createParentsAsContainers); } /** @@ -279,6 +346,47 @@ public class AsyncWrappers } } + private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed) + { + Map<String, byte[]> nodes = Maps.newHashMap(); + if ( children.size() == 0 ) + { + future.complete(nodes); + return; + } + + children.forEach(node -> { + String path = ZKPaths.makePath(parentPath, node); + AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path); + stage.handle((data, e) -> { + if ( e != null ) + { + future.completeExceptionally(e); + } + else + { + nodes.put(path, data); + if ( nodes.size() == children.size() ) + { + future.complete(nodes); + } + } + return null; + }); + }); + } + + private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option) + { + String localPath = ZKPaths.makePath(path, "foo"); + return client + .checkExists() + .withOptions(Collections.singleton(option)) + .forPath(localPath) + .thenApply(__ -> null) + ; + } + private AsyncWrappers() { }
