Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 396d98a51 -> 6037c7b27


further refinement of previous change. Not all completion handlers  should be 
in the cache's thread. But, allow for the default async to use it


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6037c7b2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6037c7b2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6037c7b2

Branch: refs/heads/CURATOR-397
Commit: 6037c7b270b12dce273aa67939a78a4c00b5155f
Parents: 396d98a
Author: randgalt <[email protected]>
Authored: Thu May 11 12:53:49 2017 +0200
Committer: randgalt <[email protected]>
Committed: Thu May 11 12:53:49 2017 +0200

----------------------------------------------------------------------
 .../modeled/cached/CachedModeledFramework.java  |  13 +-
 .../details/CachedModeledFrameworkImpl.java     |  34 ++---
 .../x/async/modeled/details/CachedStage.java    | 117 -----------------
 .../x/async/modeled/details/ModelStage.java     | 125 ++++++++++++++++++-
 .../modeled/details/ModeledFrameworkImpl.java   |  12 +-
 .../details/VersionedModeledFrameworkImpl.java  |   2 +-
 .../modeled/TestCachedModeledFramework.java     |  12 +-
 7 files changed, 165 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
index 9e1fb68..27bd57e 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.x.async.modeled.ModeledFramework;
 import org.apache.curator.x.async.modeled.ZPath;
 import java.io.Closeable;
+import java.util.concurrent.Executor;
 
 public interface CachedModeledFramework<T> extends ModeledFramework<T>, 
Closeable
 {
@@ -30,7 +31,17 @@ public interface CachedModeledFramework<T> extends 
ModeledFramework<T>, Closeabl
      *
      * @return cache
      */
-    ModeledCache<T> getCache();
+    ModeledCache<T> cache();
+
+    /**
+     * Returns a view of this instance that uses the CachedModeledFramework's 
executor
+     * for all default async completion operations. i.e. when you use, for 
example,
+     * {@link 
java.util.concurrent.CompletionStage#handleAsync(java.util.function.BiFunction)}
+     * this instance's executor is used instead of 
<code>ForkJoinPool.commonPool()</code>.
+     *
+     * @return view
+     */
+    CachedModeledFramework<T> asyncDefault();
 
     /**
      * Start the internally created cache

http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index 9e1fa8f..7df2e98 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -47,26 +47,34 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
     private final Executor executor;
+    private final boolean asyncDefaultMode;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService 
executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), 
client.modelSpec(), executor), executor);
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), 
client.modelSpec(), executor), executor, false);
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, 
ModeledCacheImpl<T> cache, Executor executor)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, 
ModeledCacheImpl<T> cache, Executor executor, boolean asyncDefaultMode)
     {
         this.client = client;
         this.cache = cache;
         this.executor = executor;
+        this.asyncDefaultMode = asyncDefaultMode;
     }
 
     @Override
-    public ModeledCache<T> getCache()
+    public ModeledCache<T> cache()
     {
         return cache;
     }
 
     @Override
+    public CachedModeledFramework<T> asyncDefault()
+    {
+        return new CachedModeledFrameworkImpl<>(client, cache, executor, true);
+    }
+
+    @Override
     public void start()
     {
         cache.start();
@@ -117,13 +125,13 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> at(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.at(child), cache, 
executor);
+        return new CachedModeledFrameworkImpl<>(client.at(child), cache, 
executor, asyncDefaultMode);
     }
 
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, 
executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, 
executor, asyncDefaultMode);
     }
 
     @Override
@@ -261,25 +269,21 @@ class CachedModeledFrameworkImpl<T> implements 
CachedModeledFramework<T>
         return client.inTransaction(operations);
     }
 
-    private <U> CachedStage<U> completed(U value)
+    private <U> AsyncStage<U> completed(U value)
     {
-        CachedStage<U> stage = new CachedStage<>(executor);
-        stage.complete(value);
-        return stage;
+        return asyncDefaultMode ? ModelStage.asyncCompleted(value, executor) : 
ModelStage.completed(value);
     }
 
-    private <U> CachedStage<U> completedExceptionally(Exception e)
+    private <U> AsyncStage<U> exceptionally(Exception e)
     {
-        CachedStage<U> stage = new CachedStage<>(executor);
-        stage.completeExceptionally(e);
-        return stage;
+        return asyncDefaultMode ? ModelStage.asyncExceptionally(e, executor) : 
ModelStage.exceptionally(e);
     }
 
-    private <U> CachedStage<U> internalRead(Function<ZNode<T>, U> resolver)
+    private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver)
     {
         ZPath path = client.modelSpec().path();
         Optional<ZNode<T>> data = cache.currentData(path);
         return data.map(node -> completed(resolver.apply(node)))
-            .orElseGet(() -> completedExceptionally(new 
KeeperException.NoNodeException(path.fullPath())));
+            .orElseGet(() -> exceptionally(new 
KeeperException.NoNodeException(path.fullPath())));
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
deleted file mode 100644
index edf6b98..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedStage.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.details;
-
-import org.apache.curator.x.async.AsyncStage;
-import org.apache.zookeeper.WatchedEvent;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-class CachedStage<T> extends CompletableFuture<T> implements AsyncStage<T>
-{
-    private final Executor executor;
-
-    CachedStage(Executor executor)
-    {
-        this.executor = executor;
-    }
-
-    @Override
-    public CompletionStage<WatchedEvent> event()
-    {
-        return null;
-    }
-
-    @Override
-    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> 
fn)
-    {
-        return thenApplyAsync(fn, executor);
-    }
-
-    @Override
-    public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
-    {
-        return thenAcceptAsync(action, executor);
-    }
-
-    @Override
-    public CompletableFuture<Void> thenRun(Runnable action)
-    {
-        return thenRunAsync(action, executor);
-    }
-
-    @Override
-    public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends 
U> other, BiFunction<? super T, ? super U, ? extends V> fn)
-    {
-        return thenCombineAsync(other, fn, executor);
-    }
-
-    @Override
-    public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? 
extends U> other, BiConsumer<? super T, ? super U> action)
-    {
-        return thenAcceptBothAsync(other, action, executor);
-    }
-
-    @Override
-    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 
Runnable action)
-    {
-        return runAfterBothAsync(other, action, executor);
-    }
-
-    @Override
-    public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> 
other, Function<? super T, U> fn)
-    {
-        return applyToEitherAsync(other, fn, executor);
-    }
-
-    @Override
-    public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> 
other, Consumer<? super T> action)
-    {
-        return acceptEitherAsync(other, action, executor);
-    }
-
-    @Override
-    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 
Runnable action)
-    {
-        return runAfterEitherAsync(other, action, executor);
-    }
-
-    @Override
-    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends 
CompletionStage<U>> fn)
-    {
-        return thenComposeAsync(fn, executor);
-    }
-
-    @Override
-    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super 
Throwable> action)
-    {
-        return whenCompleteAsync(action, executor);
-    }
-
-    @Override
-    public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? 
extends U> fn)
-    {
-        return handleAsync(fn, executor);
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
index f132df5..dfeb5d1 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java
@@ -22,14 +22,47 @@ import org.apache.curator.x.async.AsyncStage;
 import org.apache.zookeeper.WatchedEvent;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T>
 {
     private final CompletionStage<WatchedEvent> event;
 
-    ModelStage(CompletionStage<WatchedEvent> event)
+    static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event)
     {
-        this.event = event;
+        return new ModelStage<>(event);
+    }
+
+    static <U> ModelStage<U> completed(U value)
+    {
+        ModelStage<U> stage = new ModelStage<>(null);
+        stage.complete(value);
+        return stage;
+    }
+
+    static <U> ModelStage<U> exceptionally(Exception e)
+    {
+        ModelStage<U> stage = new ModelStage<>(null);
+        stage.completeExceptionally(e);
+        return stage;
+    }
+
+    static <U> ModelStage<U> asyncCompleted(U value, Executor executor)
+    {
+        ModelStage<U> stage = new AsyncModelStage<>(executor);
+        stage.complete(value);
+        return stage;
+    }
+
+    static <U> ModelStage<U> asyncExceptionally(Exception e, Executor executor)
+    {
+        ModelStage<U> stage = new AsyncModelStage<>(executor);
+        stage.completeExceptionally(e);
+        return stage;
     }
 
     @Override
@@ -37,4 +70,92 @@ class ModelStage<T> extends CompletableFuture<T> implements 
AsyncStage<T>
     {
         return event;
     }
+
+    private ModelStage(CompletionStage<WatchedEvent> event)
+    {
+        this.event = event;
+    }
+
+    private static class AsyncModelStage<U> extends ModelStage<U>
+    {
+        private final Executor executor;
+
+        public AsyncModelStage(Executor executor)
+        {
+            super(null);
+            this.executor = executor;
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> thenApplyAsync(Function<? super U, ? 
extends U1> fn)
+        {
+            return super.thenApplyAsync(fn, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> 
action)
+        {
+            return super.thenAcceptAsync(action, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> thenRunAsync(Runnable action)
+        {
+            return super.thenRunAsync(action, executor);
+        }
+
+        @Override
+        public <U1, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? 
extends U1> other, BiFunction<? super U, ? super U1, ? extends V> fn)
+        {
+            return super.thenCombineAsync(other, fn, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<Void> 
thenAcceptBothAsync(CompletionStage<? extends U1> other, BiConsumer<? super U, 
? super U1> action)
+        {
+            return super.thenAcceptBothAsync(other, action, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> 
other, Runnable action)
+        {
+            return super.runAfterBothAsync(other, action, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> applyToEitherAsync(CompletionStage<? 
extends U> other, Function<? super U, U1> fn)
+        {
+            return super.applyToEitherAsync(other, fn, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? 
extends U> other, Consumer<? super U> action)
+        {
+            return super.acceptEitherAsync(other, action, executor);
+        }
+
+        @Override
+        public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> 
other, Runnable action)
+        {
+            return super.runAfterEitherAsync(other, action, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> thenComposeAsync(Function<? super U, 
? extends CompletionStage<U1>> fn)
+        {
+            return super.thenComposeAsync(fn, executor);
+        }
+
+        @Override
+        public CompletableFuture<U> whenCompleteAsync(BiConsumer<? super U, ? 
super Throwable> action)
+        {
+            return super.whenCompleteAsync(action, executor);
+        }
+
+        @Override
+        public <U1> CompletableFuture<U1> handleAsync(BiFunction<? super U, 
Throwable, ? extends U1> fn)
+        {
+            return super.handleAsync(fn, executor);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index ef8192f..7be713c 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -161,9 +161,7 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
         }
         catch ( Exception e )
         {
-            ModelStage<String> exceptionStage = new ModelStage<>(null);
-            exceptionStage.completeExceptionally(e);
-            return exceptionStage;
+            return ModelStage.exceptionally(e);
         }
     }
 
@@ -207,9 +205,7 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
         }
         catch ( Exception e )
         {
-            ModelStage<Stat> exceptionStage = new ModelStage<>(null);
-            exceptionStage.completeExceptionally(e);
-            return exceptionStage;
+            return ModelStage.exceptionally(e);
         }
     }
 
@@ -235,7 +231,7 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
     public AsyncStage<List<ZPath>> children()
     {
         AsyncStage<List<String>> asyncStage = 
watchableClient.getChildren().forPath(modelSpec.path().fullPath());
-        ModelStage<List<ZPath>> modelStage = new 
ModelStage<>(asyncStage.event());
+        ModelStage<List<ZPath>> modelStage = 
ModelStage.make(asyncStage.event());
         asyncStage.whenComplete((children, e) -> {
             if ( e != null )
             {
@@ -354,7 +350,7 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T>
         Stat stat = (storingStatIn != null) ? storingStatIn : new Stat();
         AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? 
watchableClient.getData().decompressedStoringStatIn(stat) : 
watchableClient.getData().storingStatIn(stat);
         AsyncStage<byte[]> asyncStage = 
next.forPath(modelSpec.path().fullPath());
-        ModelStage<U> modelStage = new ModelStage<>(asyncStage.event());
+        ModelStage<U> modelStage = ModelStage.make(asyncStage.event());
         asyncStage.whenComplete((value, e) -> {
             if ( e != null )
             {

http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
index 9717cff..89d7615 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java
@@ -57,7 +57,7 @@ class VersionedModeledFrameworkImpl<T> implements 
VersionedModeledFramework<T>
     {
         Stat localStat = (storingStatIn != null) ? storingStatIn : new Stat();
         AsyncStage<T> stage = client.read(localStat);
-        ModelStage<Versioned<T>> modelStage = new ModelStage<>(stage.event());
+        ModelStage<Versioned<T>> modelStage = ModelStage.make(stage.event());
         stage.whenComplete((model, e) -> {
            if ( e != null )
            {

http://git-wip-us.apache.org/repos/asf/curator/blob/6037c7b2/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index a9048de..dc1138e 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -39,7 +39,7 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase
     public void testThreading()
     {
         TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
-        CachedModeledFramework<TestModel> client = 
ModeledFramework.wrap(async, modelSpec).cached();
+        CachedModeledFramework<TestModel> client = 
ModeledFramework.wrap(async, modelSpec).cached().asyncDefault();
 
         CountDownLatch latch = new CountDownLatch(1);
         client.listenable().addListener((type, path1, stat, model1) -> 
latch.countDown());
@@ -51,12 +51,12 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase
             Assert.assertTrue(new Timing().awaitLatch(latch));
 
             AtomicReference<Thread> completionThread = new AtomicReference<>();
-            complete(client.read().whenComplete((s, e) -> 
completionThread.set((e == null) ? Thread.currentThread() : null)));
+            complete(client.read().whenCompleteAsync((s, e) -> 
completionThread.set((e == null) ? Thread.currentThread() : null)));
             Assert.assertNotNull(completionThread.get());
             Assert.assertNotEquals(Thread.currentThread(), 
completionThread.get(), "Should be different threads");
             completionThread.set(null);
 
-            complete(client.at("foo").read().whenComplete((v, e) -> 
completionThread.set((e != null) ? Thread.currentThread() : null)));
+            complete(client.at("foo").read().whenCompleteAsync((v, e) -> 
completionThread.set((e != null) ? Thread.currentThread() : null)));
             Assert.assertNotNull(completionThread.get());
             Assert.assertNotEquals(Thread.currentThread(), 
completionThread.get(), "Should be different threads");
             completionThread.set(null);
@@ -77,7 +77,7 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase
             return thread;
         });
         TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE);
-        CachedModeledFramework<TestModel> client = 
ModeledFramework.wrap(async, modelSpec).cached(executor);
+        CachedModeledFramework<TestModel> client = 
ModeledFramework.wrap(async, modelSpec).cached(executor).asyncDefault();
 
         CountDownLatch latch = new CountDownLatch(1);
         client.listenable().addListener((type, path1, stat, model1) -> 
latch.countDown());
@@ -89,11 +89,11 @@ public class TestCachedModeledFramework extends 
TestModeledFrameworkBase
             Assert.assertTrue(new Timing().awaitLatch(latch));
 
             AtomicReference<Thread> completionThread = new AtomicReference<>();
-            complete(client.read().thenAccept(s -> 
completionThread.set(Thread.currentThread())));
+            complete(client.read().thenAcceptAsync(s -> 
completionThread.set(Thread.currentThread())));
             Assert.assertEquals(ourThread.get(), completionThread.get(), 
"Should be our thread");
             completionThread.set(null);
 
-            complete(client.at("foo").read().whenComplete((v, e) -> 
completionThread.set((e != null) ? Thread.currentThread() : null)));
+            complete(client.at("foo").read().whenCompleteAsync((v, e) -> 
completionThread.set((e != null) ? Thread.currentThread() : null)));
             Assert.assertEquals(ourThread.get(), completionThread.get(), 
"Should be our thread");
             completionThread.set(null);
         }

Reply via email to