Repository: curator Updated Branches: refs/heads/CURATOR-397 29b09ceb5 -> f2370b771
1. Allow for an Executor service to be passed to the cache 2. CachedModeledFramework must complete the stages via a thread as the caller is expected async processing Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f2370b77 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f2370b77 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f2370b77 Branch: refs/heads/CURATOR-397 Commit: f2370b7710b85850936f48404c6959d45ed03626 Parents: 29b09ce Author: randgalt <[email protected]> Authored: Thu May 4 23:15:14 2017 -0500 Committer: randgalt <[email protected]> Committed: Thu May 4 23:15:14 2017 -0500 ---------------------------------------------------------------------- .../x/async/modeled/ModeledFramework.java | 24 +++++++++-- .../details/CachedModeledFrameworkImpl.java | 42 +++++++++++++++----- .../x/async/modeled/details/ModelStage.java | 12 ------ .../async/modeled/details/ModeledCacheImpl.java | 7 +++- .../modeled/details/ModeledFrameworkImpl.java | 10 ++++- .../confluence/modeled-components.confluence | 4 +- .../site/confluence/modeled-typed.confluence | 4 +- 7 files changed, 71 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java index 70671ea..f8cf4c7 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java @@ -25,6 +25,7 @@ import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.zookeeper.data.Stat; import java.util.List; +import java.util.concurrent.ExecutorService; public interface ModeledFramework<T> { @@ -65,16 +66,31 @@ public interface ModeledFramework<T> } /** - * Use an internally created cache as a front for this modeled instance. All read APIs use the internal - * cache. i.e. read calls always use the cache instead of making direct queries. Note: you must call - * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#start()} and - * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#close()} to start/stop + * <p> + * Use an internally created cache as a front for this modeled instance. All read APIs use the internal + * cache. i.e. read calls always use the cache instead of making direct queries. Note: you must call + * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#start()} and + * {@link org.apache.curator.x.async.modeled.cached.CachedModeledFramework#close()} to start/stop + * </p> + * + * <p> + * Note: this method internally allocates an Executor for the cache and read methods. Use + * {@link #cached(java.util.concurrent.ExecutorService)} if you'd like to provide your own executor service. + * </p> * * @return wrapped instance */ CachedModeledFramework<T> cached(); /** + * Same as {@link #cached()} but allows for providing an executor service + * + * @param executor thread pool to use for the cache and for read operations + * @return wrapped instance + */ + CachedModeledFramework<T> cached(ExecutorService executor); + + /** * Returns the client that was originally passed to {@link #wrap(org.apache.curator.x.async.AsyncCuratorFramework, ModelSpec)} or * the builder. * http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index ba11bc2..2209d37 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -37,21 +37,25 @@ import org.apache.zookeeper.server.DataTree; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { private final ModeledFramework<T> client; private final ModeledCacheImpl<T> cache; + private final Executor executor; - CachedModeledFrameworkImpl(ModeledFramework<T> client) + CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec())); + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); } - private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache) + private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor) { this.client = client; this.cache = cache; + this.executor = executor; } @Override @@ -81,7 +85,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> cached() { - return this; + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public CachedModeledFramework<T> cached(ExecutorService executor) + { + throw new UnsupportedOperationException("Already a cached instance"); } @Override @@ -99,13 +109,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> at(Object child) { - return new CachedModeledFrameworkImpl<>(client.at(child), cache); + return new CachedModeledFrameworkImpl<>(client.at(child), cache, executor); } @Override public CachedModeledFramework<T> withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); } @Override @@ -136,8 +146,8 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { DataTree.copyStat(node.stat(), storingStatIn); } - return new ModelStage<>(node.model()); - }).orElseGet(() -> new ModelStage<>(new KeeperException.NoNodeException(path.fullPath()))); + return completed(new ModelStage<>(), node.model()); + }).orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath()))); } @Override @@ -169,14 +179,14 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { ZPath path = client.modelSpec().path(); Optional<ZNode<T>> data = cache.currentData(path); - return data.map(node -> new ModelStage<>(node.stat())).orElseGet(() -> new ModelStage<>((Stat)null)); + return data.map(node -> completed(new ModelStage<>(), node.stat())).orElseGet(() -> completed(new ModelStage<>(), null)); } @Override public AsyncStage<List<ZPath>> children() { Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet(); - return new ModelStage<>(Lists.newArrayList(paths)); + return completed(new ModelStage<>(), Lists.newArrayList(paths)); } @Override @@ -226,4 +236,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { return client.inTransaction(operations); } + + private <U> ModelStage<U> completed(ModelStage<U> stage, U value) + { + executor.execute(() -> stage.complete(value)); + return stage; + } + + private <U> ModelStage<U> completedExceptionally(ModelStage<U> stage, Exception e) + { + executor.execute(() -> stage.completeExceptionally(e)); + return stage; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java index 77caed1..9be9a33 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java @@ -37,18 +37,6 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> this.event = event; } - ModelStage(T value) - { - event = null; - complete(value); - } - - ModelStage(Exception e) - { - event = null; - completeExceptionally(e); - } - @Override public CompletionStage<WatchedEvent> event() { http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 061ea17..091a727 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -18,6 +18,7 @@ */ package org.apache.curator.x.async.modeled.details; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.ListenerContainer; @@ -36,6 +37,9 @@ import java.util.AbstractMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> @@ -57,12 +61,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> } } - ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec) + ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor) { this.serializer = modelSpec.serializer(); cache = TreeCache.newBuilder(client, modelSpec.path().fullPath()) .setCacheData(false) .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) + .setExecutor(executor) .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) .build(); } http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 754fb3b..b666822 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -23,6 +23,7 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.utils.ThreadUtils; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; @@ -42,6 +43,7 @@ import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -98,8 +100,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> @Override public CachedModeledFramework<T> cached() { + return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework")); + } + + @Override + public CachedModeledFramework<T> cached(ExecutorService executor) + { Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers."); - return new CachedModeledFrameworkImpl<>(this); + return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null")); } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/site/confluence/modeled-components.confluence ---------------------------------------------------------------------- diff --git a/curator-x-async/src/site/confluence/modeled-components.confluence b/curator-x-async/src/site/confluence/modeled-components.confluence index d44932a..ab49750 100644 --- a/curator-x-async/src/site/confluence/modeled-components.confluence +++ b/curator-x-async/src/site/confluence/modeled-components.confluence @@ -45,7 +45,7 @@ A {{ModelSpec}} contains all the metadata needed to operate on a ZooKeeper path: * Options for how to delete nodes (guaranteed, deleting children, etc.) ModelSpec instances are created via a builder. The builder sets defaults that should be -desired for most applications but you can alter any of these as needed. +useful for most applications but you can alter any of these as needed. {code} // a standard model spec for the given path and serializer @@ -87,7 +87,7 @@ The "set" call in the above example is the equivalent of: {code} MyModel instance = ... String path = "/foo/bar/" + instance.getId(); -byte[] data = serializer.serialize(data); +byte[] data = serializer.serialize(instance); client.create() .withOptions(Sets.newHashSet(CreateOption.createParentsAsContainers, CreateOption.setDataIfExists)) .forPath(path, data); http://git-wip-us.apache.org/repos/asf/curator/blob/f2370b77/curator-x-async/src/site/confluence/modeled-typed.confluence ---------------------------------------------------------------------- diff --git a/curator-x-async/src/site/confluence/modeled-typed.confluence b/curator-x-async/src/site/confluence/modeled-typed.confluence index 5b8affd..5a8a597 100644 --- a/curator-x-async/src/site/confluence/modeled-typed.confluence +++ b/curator-x-async/src/site/confluence/modeled-typed.confluence @@ -1,6 +1,6 @@ h1. Modeled Curator \- Caching and Typed Parameters -In addition to the [[main features|modeled-components.html]] Modeled Curator also supports +In addition to its [[main features|modeled-components.html]] Modeled Curator also supports integrated caching and typed parameters. h2. Caching @@ -35,7 +35,7 @@ Typed interfaces are provided for up to 10 parameters and are named {{TypedModeledFramework2}}, etc. Here's an example of a TypedModeledFramework that models a Person and uses two parameters -to generate the path, a Group and a Organization: +to generate the path, a Group and an Organization: {code} TypedModeledFramework2<Person, Group, Organization> clientTemplate = TypedModeledFramework2.from(
