A few things for CURATOR-397 1. AsyncWrappers.asyncEnsureContainers was just wrong - this is a better implementation 2. Added raw serializer constant 3. Add ModeledOptions which can be expanded in the future. For now it just has ignoreMissingNodesForChildren
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11be719b Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11be719b Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11be719b Branch: refs/heads/CURATOR-419 Commit: 11be719b32bcf8879c44a0c2005ba5a2107986cb Parents: 4a0e022 Author: randgalt <[email protected]> Authored: Fri Jul 14 08:13:03 2017 -0500 Committer: randgalt <[email protected]> Committed: Fri Jul 14 08:16:01 2017 -0500 ---------------------------------------------------------------------- .../apache/curator/x/async/AsyncWrappers.java | 38 +++++++------------- .../x/async/modeled/ModelSerializer.java | 18 ++++++++++ .../async/modeled/ModeledFrameworkBuilder.java | 20 ++++++++++- .../curator/x/async/modeled/ModeledOptions.java | 29 +++++++++++++++ .../modeled/details/ModeledFrameworkImpl.java | 33 +++++++++++++---- 5 files changed, 104 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java index e982cf2..7da82fc 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java @@ -20,7 +20,10 @@ package org.apache.curator.x.async; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.x.async.api.ExistsOption; import org.apache.curator.x.async.modeled.ZPath; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -67,38 +70,21 @@ public class AsyncWrappers { /** * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using - * the {@link java.util.concurrent.ForkJoinPool#commonPool()}. - * - * @param client client - * @param path path to ensure - * @return stage - */ - public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path) - { - return asyncEnsureContainers(client, path, null); - } - - /** - * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using * the given executor * * @param client client * @param path path to ensure * @return stage */ - public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor) + public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path) { - Runnable proc = () -> { - try - { - client.unwrap().createContainers(path.fullPath()); - } - catch ( Exception e ) - { - throw new RuntimeException(e); - } - }; - return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc); + Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers); + return client + .checkExists() + .withOptions(options) + .forPath(path.child("foo").fullPath()) + .thenApply(__ -> null) + ; } /** @@ -284,7 +270,7 @@ public class AsyncWrappers future.complete(null); } } - catch ( Exception e ) + catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); future.completeExceptionally(e); http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java index 428096e..476f314 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java @@ -40,4 +40,22 @@ public interface ModelSerializer<T> * @throws RuntimeException if <code>bytes</code> is invalid or there was an error deserializing */ T deserialize(byte[] bytes); + + /** + * A pass through serializer + */ + ModelSerializer<byte[]> raw = new ModelSerializer<byte[]>() + { + @Override + public byte[] serialize(byte[] model) + { + return model; + } + + @Override + public byte[] deserialize(byte[] bytes) + { + return bytes; + } + }; } http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java index 2e8bec3..1df68e6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java @@ -18,13 +18,16 @@ */ package org.apache.curator.x.async.modeled; +import com.google.common.collect.ImmutableSet; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl; import org.apache.zookeeper.WatchedEvent; +import java.util.Collections; import java.util.Objects; +import java.util.Set; import java.util.function.UnaryOperator; public class ModeledFrameworkBuilder<T> @@ -35,6 +38,7 @@ public class ModeledFrameworkBuilder<T> private UnaryOperator<WatchedEvent> watcherFilter; private UnhandledErrorListener unhandledErrorListener; private UnaryOperator<CuratorEvent> resultFilter; + private Set<ModeledOptions> modeledOptions; /** * Build a new ModeledFramework instance @@ -49,7 +53,8 @@ public class ModeledFrameworkBuilder<T> watchMode, watcherFilter, unhandledErrorListener, - resultFilter + resultFilter, + modeledOptions ); } @@ -142,6 +147,18 @@ public class ModeledFrameworkBuilder<T> return this; } + /** + * Change the modeled options + * + * @param modeledOptions new options set + * @return this for chaining + */ + public ModeledFrameworkBuilder<T> withOptions(Set<ModeledOptions> modeledOptions) + { + this.modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "client cannot be null")); + return this; + } + ModeledFrameworkBuilder() { } @@ -150,5 +167,6 @@ public class ModeledFrameworkBuilder<T> { this.client = Objects.requireNonNull(client, "client cannot be null"); this.modelSpec = Objects.requireNonNull(modelSpec, "modelSpec cannot be null"); + modeledOptions = Collections.singleton(ModeledOptions.ignoreMissingNodesForChildren); } } http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java new file mode 100644 index 0000000..434894b --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled; + +public enum ModeledOptions +{ + /** + * Causes {@link ModeledFramework#children()} and {@link ModeledFramework#childrenAsZNodes()} + * to ignore {@link org.apache.zookeeper.KeeperException.NoNodeException} and merely return + * an empty list + */ + ignoreMissingNodesForChildren +} http://git-wip-us.apache.org/repos/asf/curator/blob/11be719b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index c1d19c4..44011ee 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -19,6 +19,8 @@ package org.apache.curator.x.async.modeled.details; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; @@ -36,13 +38,16 @@ import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModelSpec; import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ModeledOptions; import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -62,13 +67,15 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> private final UnaryOperator<CuratorEvent> resultFilter; private final AsyncCuratorFrameworkDsl dslClient; private final boolean isWatched; + private final Set<ModeledOptions> modeledOptions; - public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter) + public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions) { boolean isWatched = (watchMode != null); Objects.requireNonNull(client, "client cannot be null"); Objects.requireNonNull(model, "model cannot be null"); + modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null")); watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; @@ -84,11 +91,12 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> watcherFilter, unhandledErrorListener, resultFilter, - isWatched + isWatched, + modeledOptions ); } - private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched) + private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched, Set<ModeledOptions> modeledOptions) { this.client = client; this.dslClient = dslClient; @@ -99,6 +107,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> this.unhandledErrorListener = unhandledErrorListener; this.resultFilter = resultFilter; this.isWatched = isWatched; + this.modeledOptions = modeledOptions; } @Override @@ -280,7 +289,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> asyncStage.whenComplete((children, e) -> { if ( e != null ) { - modelStage.completeExceptionally(e); + if ( modeledOptions.contains(ModeledOptions.ignoreMissingNodesForChildren) && (Throwables.getRootCause(e) instanceof KeeperException.NoNodeException) ) + { + modelStage.complete(Collections.emptyList()); + } + else + { + modelStage.completeExceptionally(e); + } } else { @@ -303,7 +319,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> watcherFilter, unhandledErrorListener, resultFilter, - isWatched + isWatched, + modeledOptions ); } @@ -320,7 +337,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> watcherFilter, unhandledErrorListener, resultFilter, - isWatched + isWatched, + modeledOptions ); } @@ -337,7 +355,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> watcherFilter, unhandledErrorListener, resultFilter, - isWatched + isWatched, + modeledOptions ); }
