CachedModeledFrameworkImpl wasn't using executor in right way. Completions need to happen async not setting of the value. This required creating CachedStage which proxies all non-async methods to their async counterparts using the CachedModeledFrameworkImpl's executor
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6485f165 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6485f165 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6485f165 Branch: refs/heads/CURATOR-397 Commit: 6485f1650e37e392e6122f8b13ff432ee0321666 Parents: 80ca587 Author: randgalt <[email protected]> Authored: Wed May 10 19:23:00 2017 +0200 Committer: randgalt <[email protected]> Committed: Wed May 10 19:23:00 2017 +0200 ---------------------------------------------------------------------- .../x/async/api/AsyncCuratorFrameworkDsl.java | 1 - .../details/CachedModeledFrameworkImpl.java | 20 ++-- .../x/async/modeled/details/CachedStage.java | 117 +++++++++++++++++++ .../modeled/TestCachedModeledFramework.java | 87 ++++++++++++++ .../x/async/modeled/TestModeledFramework.java | 10 +- 5 files changed, 220 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java index 0807160..bc66bb6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java @@ -19,7 +19,6 @@ package org.apache.curator.x.async.api; import org.apache.curator.framework.api.transaction.CuratorOp; -import org.apache.curator.x.async.WatchMode; /** * Zookeeper framework-style client http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 15db3ba..9e1fa8f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -203,14 +203,14 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { ZPath path = client.modelSpec().path(); Optional<ZNode<T>> data = cache.currentData(path); - return data.map(node -> completed(new ModelStage<>(null), node.stat())).orElseGet(() -> completed(new ModelStage<>(null), null)); + return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null)); } @Override public AsyncStage<List<ZPath>> children() { Set<ZPath> paths = cache.currentChildren(client.modelSpec().path()).keySet(); - return completed(new ModelStage<>(null), Lists.newArrayList(paths)); + return completed(Lists.newArrayList(paths)); } @Override @@ -261,23 +261,25 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> return client.inTransaction(operations); } - private <U> ModelStage<U> completed(ModelStage<U> stage, U value) + private <U> CachedStage<U> completed(U value) { - executor.execute(() -> stage.complete(value)); + CachedStage<U> stage = new CachedStage<>(executor); + stage.complete(value); return stage; } - private <U> ModelStage<U> completedExceptionally(ModelStage<U> stage, Exception e) + private <U> CachedStage<U> completedExceptionally(Exception e) { - executor.execute(() -> stage.completeExceptionally(e)); + CachedStage<U> stage = new CachedStage<>(executor); + stage.completeExceptionally(e); return stage; } - private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver) + private <U> CachedStage<U> internalRead(Function<ZNode<T>, U> resolver) { ZPath path = client.modelSpec().path(); Optional<ZNode<T>> data = cache.currentData(path); - return data.map(node -> completed(new ModelStage<>(null), resolver.apply(node))) - .orElseGet(() -> completedExceptionally(new ModelStage<>(null), new KeeperException.NoNodeException(path.fullPath()))); + return data.map(node -> completed(resolver.apply(node))) + .orElseGet(() -> completedExceptionally(new KeeperException.NoNodeException(path.fullPath()))); } } http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java new file mode 100644 index 0000000..edf6b98 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.x.async.AsyncStage; +import org.apache.zookeeper.WatchedEvent; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +class CachedStage<T> extends CompletableFuture<T> implements AsyncStage<T> +{ + private final Executor executor; + + CachedStage(Executor executor) + { + this.executor = executor; + } + + @Override + public CompletionStage<WatchedEvent> event() + { + return null; + } + + @Override + public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) + { + return thenApplyAsync(fn, executor); + } + + @Override + public CompletableFuture<Void> thenAccept(Consumer<? super T> action) + { + return thenAcceptAsync(action, executor); + } + + @Override + public CompletableFuture<Void> thenRun(Runnable action) + { + return thenRunAsync(action, executor); + } + + @Override + public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) + { + return thenCombineAsync(other, fn, executor); + } + + @Override + public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) + { + return thenAcceptBothAsync(other, action, executor); + } + + @Override + public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) + { + return runAfterBothAsync(other, action, executor); + } + + @Override + public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) + { + return applyToEitherAsync(other, fn, executor); + } + + @Override + public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) + { + return acceptEitherAsync(other, action, executor); + } + + @Override + public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) + { + return runAfterEitherAsync(other, action, executor); + } + + @Override + public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) + { + return thenComposeAsync(fn, executor); + } + + @Override + public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) + { + return whenCompleteAsync(action, executor); + } + + @Override + public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) + { + return handleAsync(fn, executor); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java new file mode 100644 index 0000000..7be7c28 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled; + +import org.apache.curator.test.Timing; +import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.models.TestModel; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.math.BigInteger; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +public class TestCachedModeledFramework extends TestModeledFramework +{ + @Test + public void testThreading() + { + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(); + + CountDownLatch latch = new CountDownLatch(1); + client.listenable().addListener((type, path1, stat, model1) -> latch.countDown()); + + complete(client.set(model)); + client.start(); + Assert.assertTrue(new Timing().awaitLatch(latch)); + + AtomicReference<Thread> completionThread = new AtomicReference<>(); + complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); + Assert.assertNotNull(completionThread.get()); + Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); + completionThread.set(null); + + complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); + Assert.assertNotNull(completionThread.get()); + Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); + completionThread.set(null); + } + + @Test + public void testCustomThreading() + { + AtomicReference<Thread> ourThread = new AtomicReference<>(); + ExecutorService executor = Executors.newSingleThreadExecutor(r -> { + Thread thread = new Thread(r, "testCustomThreading"); + ourThread.set(thread); + return thread; + }); + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(executor); + + CountDownLatch latch = new CountDownLatch(1); + client.listenable().addListener((type, path1, stat, model1) -> latch.countDown()); + + complete(client.set(model)); + client.start(); + Assert.assertTrue(new Timing().awaitLatch(latch)); + + AtomicReference<Thread> completionThread = new AtomicReference<>(); + complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); + Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); + completionThread.set(null); + + complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); + Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); + completionThread.set(null); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/6485f165/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java index b21c8ca..98d5ee1 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java @@ -45,11 +45,11 @@ import java.util.concurrent.CountDownLatch; public class TestModeledFramework extends CompletableBaseClassForTests { - private static final ZPath path = ZPath.parse("/test/path"); - private CuratorFramework rawClient; - private ModelSpec<TestModel> modelSpec; - private ModelSpec<TestNewerModel> newModelSpec; - private AsyncCuratorFramework async; + protected static final ZPath path = ZPath.parse("/test/path"); + protected CuratorFramework rawClient; + protected ModelSpec<TestModel> modelSpec; + protected ModelSpec<TestNewerModel> newModelSpec; + protected AsyncCuratorFramework async; @BeforeMethod @Override
