Repository: curator Updated Branches: refs/heads/CURATOR-397 396d98a51 -> 6037c7b27
further refinement of previous change. Not all completion handlers should be in the cache's thread. But, allow for the default async to use it Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6037c7b2 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6037c7b2 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6037c7b2 Branch: refs/heads/CURATOR-397 Commit: 6037c7b270b12dce273aa67939a78a4c00b5155f Parents: 396d98a Author: randgalt <[email protected]> Authored: Thu May 11 12:53:49 2017 +0200 Committer: randgalt <[email protected]> Committed: Thu May 11 12:53:49 2017 +0200 ---------------------------------------------------------------------- .../modeled/cached/CachedModeledFramework.java | 13 +- .../details/CachedModeledFrameworkImpl.java | 34 ++--- .../x/async/modeled/details/CachedStage.java | 117 ----------------- .../x/async/modeled/details/ModelStage.java | 125 ++++++++++++++++++- .../modeled/details/ModeledFrameworkImpl.java | 12 +- .../details/VersionedModeledFrameworkImpl.java | 2 +- .../modeled/TestCachedModeledFramework.java | 12 +- 7 files changed, 165 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java index 9e1fb68..27bd57e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ZPath; import java.io.Closeable; +import java.util.concurrent.Executor; public interface CachedModeledFramework<T> extends ModeledFramework<T>, Closeable { @@ -30,7 +31,17 @@ public interface CachedModeledFramework<T> extends ModeledFramework<T>, Closeabl * * @return cache */ - ModeledCache<T> getCache(); + ModeledCache<T> cache(); + + /** + * Returns a view of this instance that uses the CachedModeledFramework's executor + * for all default async completion operations. i.e. when you use, for example, + * {@link java.util.concurrent.CompletionStage#handleAsync(java.util.function.BiFunction)} + * this instance's executor is used instead of <code>ForkJoinPool.commonPool()</code>. + * + * @return view + */ + CachedModeledFramework<T> asyncDefault(); /** * Start the internally created cache http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 9e1fa8f..7df2e98 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -47,26 +47,34 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> private final ModeledFramework<T> client; private final ModeledCacheImpl<T> cache; private final Executor executor; + private final boolean asyncDefaultMode; CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) { - this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor); + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor, false); } - private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor) + private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor, boolean asyncDefaultMode) { this.client = client; this.cache = cache; this.executor = executor; + this.asyncDefaultMode = asyncDefaultMode; } @Override - public ModeledCache<T> getCache() + public ModeledCache<T> cache() { return cache; } @Override + public CachedModeledFramework<T> asyncDefault() + { + return new CachedModeledFrameworkImpl<>(client, cache, executor, true); + } + + @Override public void start() { cache.start(); @@ -117,13 +125,13 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public CachedModeledFramework<T> at(Object child) { - return new CachedModeledFrameworkImpl<>(client.at(child), cache, executor); + return new CachedModeledFrameworkImpl<>(client.at(child), cache, executor, asyncDefaultMode); } @Override public CachedModeledFramework<T> withPath(ZPath path) { - return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor); + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, asyncDefaultMode); } @Override @@ -261,25 +269,21 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> return client.inTransaction(operations); } - private <U> CachedStage<U> completed(U value) + private <U> AsyncStage<U> completed(U value) { - CachedStage<U> stage = new CachedStage<>(executor); - stage.complete(value); - return stage; + return asyncDefaultMode ? ModelStage.asyncCompleted(value, executor) : ModelStage.completed(value); } - private <U> CachedStage<U> completedExceptionally(Exception e) + private <U> AsyncStage<U> exceptionally(Exception e) { - CachedStage<U> stage = new CachedStage<>(executor); - stage.completeExceptionally(e); - return stage; + return asyncDefaultMode ? ModelStage.asyncExceptionally(e, executor) : ModelStage.exceptionally(e); } - private <U> CachedStage<U> internalRead(Function<ZNode<T>, U> resolver) + private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver) { ZPath path = client.modelSpec().path(); Optional<ZNode<T>> data = cache.currentData(path); return data.map(node -> completed(resolver.apply(node))) - .orElseGet(() -> completedExceptionally(new KeeperException.NoNodeException(path.fullPath()))); + .orElseGet(() -> exceptionally(new KeeperException.NoNodeException(path.fullPath()))); } } http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java deleted file mode 100644 index edf6b98..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.details; - -import org.apache.curator.x.async.AsyncStage; -import org.apache.zookeeper.WatchedEvent; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; - -class CachedStage<T> extends CompletableFuture<T> implements AsyncStage<T> -{ - private final Executor executor; - - CachedStage(Executor executor) - { - this.executor = executor; - } - - @Override - public CompletionStage<WatchedEvent> event() - { - return null; - } - - @Override - public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) - { - return thenApplyAsync(fn, executor); - } - - @Override - public CompletableFuture<Void> thenAccept(Consumer<? super T> action) - { - return thenAcceptAsync(action, executor); - } - - @Override - public CompletableFuture<Void> thenRun(Runnable action) - { - return thenRunAsync(action, executor); - } - - @Override - public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) - { - return thenCombineAsync(other, fn, executor); - } - - @Override - public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) - { - return thenAcceptBothAsync(other, action, executor); - } - - @Override - public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) - { - return runAfterBothAsync(other, action, executor); - } - - @Override - public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) - { - return applyToEitherAsync(other, fn, executor); - } - - @Override - public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) - { - return acceptEitherAsync(other, action, executor); - } - - @Override - public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) - { - return runAfterEitherAsync(other, action, executor); - } - - @Override - public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) - { - return thenComposeAsync(fn, executor); - } - - @Override - public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) - { - return whenCompleteAsync(action, executor); - } - - @Override - public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) - { - return handleAsync(fn, executor); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java index f132df5..dfeb5d1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java @@ -22,14 +22,47 @@ import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.WatchedEvent; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> { private final CompletionStage<WatchedEvent> event; - ModelStage(CompletionStage<WatchedEvent> event) + static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event) { - this.event = event; + return new ModelStage<>(event); + } + + static <U> ModelStage<U> completed(U value) + { + ModelStage<U> stage = new ModelStage<>(null); + stage.complete(value); + return stage; + } + + static <U> ModelStage<U> exceptionally(Exception e) + { + ModelStage<U> stage = new ModelStage<>(null); + stage.completeExceptionally(e); + return stage; + } + + static <U> ModelStage<U> asyncCompleted(U value, Executor executor) + { + ModelStage<U> stage = new AsyncModelStage<>(executor); + stage.complete(value); + return stage; + } + + static <U> ModelStage<U> asyncExceptionally(Exception e, Executor executor) + { + ModelStage<U> stage = new AsyncModelStage<>(executor); + stage.completeExceptionally(e); + return stage; } @Override @@ -37,4 +70,92 @@ class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> { return event; } + + private ModelStage(CompletionStage<WatchedEvent> event) + { + this.event = event; + } + + private static class AsyncModelStage<U> extends ModelStage<U> + { + private final Executor executor; + + public AsyncModelStage(Executor executor) + { + super(null); + this.executor = executor; + } + + @Override + public <U1> CompletableFuture<U1> thenApplyAsync(Function<? super U, ? extends U1> fn) + { + return super.thenApplyAsync(fn, executor); + } + + @Override + public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> action) + { + return super.thenAcceptAsync(action, executor); + } + + @Override + public CompletableFuture<Void> thenRunAsync(Runnable action) + { + return super.thenRunAsync(action, executor); + } + + @Override + public <U1, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U1> other, BiFunction<? super U, ? super U1, ? extends V> fn) + { + return super.thenCombineAsync(other, fn, executor); + } + + @Override + public <U1> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U1> other, BiConsumer<? super U, ? super U1> action) + { + return super.thenAcceptBothAsync(other, action, executor); + } + + @Override + public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) + { + return super.runAfterBothAsync(other, action, executor); + } + + @Override + public <U1> CompletableFuture<U1> applyToEitherAsync(CompletionStage<? extends U> other, Function<? super U, U1> fn) + { + return super.applyToEitherAsync(other, fn, executor); + } + + @Override + public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends U> other, Consumer<? super U> action) + { + return super.acceptEitherAsync(other, action, executor); + } + + @Override + public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) + { + return super.runAfterEitherAsync(other, action, executor); + } + + @Override + public <U1> CompletableFuture<U1> thenComposeAsync(Function<? super U, ? extends CompletionStage<U1>> fn) + { + return super.thenComposeAsync(fn, executor); + } + + @Override + public CompletableFuture<U> whenCompleteAsync(BiConsumer<? super U, ? super Throwable> action) + { + return super.whenCompleteAsync(action, executor); + } + + @Override + public <U1> CompletableFuture<U1> handleAsync(BiFunction<? super U, Throwable, ? extends U1> fn) + { + return super.handleAsync(fn, executor); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index ef8192f..7be713c 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -161,9 +161,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> } catch ( Exception e ) { - ModelStage<String> exceptionStage = new ModelStage<>(null); - exceptionStage.completeExceptionally(e); - return exceptionStage; + return ModelStage.exceptionally(e); } } @@ -207,9 +205,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> } catch ( Exception e ) { - ModelStage<Stat> exceptionStage = new ModelStage<>(null); - exceptionStage.completeExceptionally(e); - return exceptionStage; + return ModelStage.exceptionally(e); } } @@ -235,7 +231,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> public AsyncStage<List<ZPath>> children() { AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(modelSpec.path().fullPath()); - ModelStage<List<ZPath>> modelStage = new ModelStage<>(asyncStage.event()); + ModelStage<List<ZPath>> modelStage = ModelStage.make(asyncStage.event()); asyncStage.whenComplete((children, e) -> { if ( e != null ) { @@ -354,7 +350,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> Stat stat = (storingStatIn != null) ? storingStatIn : new Stat(); AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat); AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath()); - ModelStage<U> modelStage = new ModelStage<>(asyncStage.event()); + ModelStage<U> modelStage = ModelStage.make(asyncStage.event()); asyncStage.whenComplete((value, e) -> { if ( e != null ) { http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java index 9717cff..89d7615 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java @@ -57,7 +57,7 @@ class VersionedModeledFrameworkImpl<T> implements VersionedModeledFramework<T> { Stat localStat = (storingStatIn != null) ? storingStatIn : new Stat(); AsyncStage<T> stage = client.read(localStat); - ModelStage<Versioned<T>> modelStage = new ModelStage<>(stage.event()); + ModelStage<Versioned<T>> modelStage = ModelStage.make(stage.event()); stage.whenComplete((model, e) -> { if ( e != null ) { http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index a9048de..dc1138e 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -39,7 +39,7 @@ public class TestCachedModeledFramework extends TestModeledFrameworkBase public void testThreading() { TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); - CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(); + CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached().asyncDefault(); CountDownLatch latch = new CountDownLatch(1); client.listenable().addListener((type, path1, stat, model1) -> latch.countDown()); @@ -51,12 +51,12 @@ public class TestCachedModeledFramework extends TestModeledFrameworkBase Assert.assertTrue(new Timing().awaitLatch(latch)); AtomicReference<Thread> completionThread = new AtomicReference<>(); - complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); + complete(client.read().whenCompleteAsync((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); Assert.assertNotNull(completionThread.get()); Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); completionThread.set(null); - complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); + complete(client.at("foo").read().whenCompleteAsync((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); Assert.assertNotNull(completionThread.get()); Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); completionThread.set(null); @@ -77,7 +77,7 @@ public class TestCachedModeledFramework extends TestModeledFrameworkBase return thread; }); TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); - CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(executor); + CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(executor).asyncDefault(); CountDownLatch latch = new CountDownLatch(1); client.listenable().addListener((type, path1, stat, model1) -> latch.countDown()); @@ -89,11 +89,11 @@ public class TestCachedModeledFramework extends TestModeledFrameworkBase Assert.assertTrue(new Timing().awaitLatch(latch)); AtomicReference<Thread> completionThread = new AtomicReference<>(); - complete(client.read().thenAccept(s -> completionThread.set(Thread.currentThread()))); + complete(client.read().thenAcceptAsync(s -> completionThread.set(Thread.currentThread()))); Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); completionThread.set(null); - complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); + complete(client.at("foo").read().whenCompleteAsync((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); completionThread.set(null); }
