Added variant to readAsZNode
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c1878392 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c1878392 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c1878392 Branch: refs/heads/CURATOR-397 Commit: c18783924af4b4c3f11d098165c85bf313f454c2 Parents: fe0a88b Author: randgalt <[email protected]> Authored: Sun May 7 09:41:55 2017 +0200 Committer: randgalt <[email protected]> Committed: Sun May 7 09:41:55 2017 +0200 ---------------------------------------------------------------------- .../x/async/modeled/ModeledFramework.java | 8 +++ .../apache/curator/x/async/modeled/ZNode.java | 49 ++++++++++++++ .../x/async/modeled/cached/ModeledCache.java | 1 + .../curator/x/async/modeled/cached/ZNode.java | 49 -------------- .../details/CachedModeledFrameworkImpl.java | 29 ++++++--- .../async/modeled/details/ModeledCacheImpl.java | 2 +- .../modeled/details/ModeledFrameworkImpl.java | 68 ++++++++++++-------- .../x/async/modeled/details/ZNodeImpl.java | 2 +- 8 files changed, 122 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java index f8cf4c7..b475712 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java @@ -170,6 +170,14 @@ public interface ModeledFramework<T> AsyncStage<T> read(Stat storingStatIn); /** + * Read the ZNode at this instance's path and deserialize into a model + * + * @return AsyncStage + * @see org.apache.curator.x.async.AsyncStage + */ + AsyncStage<ZNode<T>> readAsZNode(); + + /** * Update the ZNode at this instance's path with a serialized * form of the given model passing "-1" for the update version * http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java new file mode 100644 index 0000000..7ed6ef5 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ZNode.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled; + +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.data.Stat; + +/** + * Abstracts a ZooKeeper node + */ +public interface ZNode<T> +{ + /** + * The path of the node + * + * @return path + */ + ZPath path(); + + /** + * The node's last known stat if available + * + * @return stat + */ + Stat stat(); + + /** + * The node's current model + * + * @return model + */ + T model(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java index 9289988..6677268 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java @@ -18,6 +18,7 @@ */ package org.apache.curator.x.async.modeled.cached; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import java.util.Map; import java.util.Optional; http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java deleted file mode 100644 index 88f3489..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ZNode.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.cached; - -import org.apache.curator.x.async.modeled.ZPath; -import org.apache.zookeeper.data.Stat; - -/** - * Abstracts a cached node - */ -public interface ZNode<T> -{ - /** - * The path of the node - * - * @return path - */ - ZPath path(); - - /** - * The node's last known stat if available - * - * @return stat - */ - Stat stat(); - - /** - * The node's current model - * - * @return model - */ - T model(); -} http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 2209d37..9ef88e8 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -30,7 +30,7 @@ import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCache; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import org.apache.curator.x.async.modeled.cached.ZNode; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; @@ -39,6 +39,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Function; class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> { @@ -133,21 +134,25 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> @Override public AsyncStage<T> read() { - return read(null); + return internalRead(ZNode::model); } @Override public AsyncStage<T> read(Stat storingStatIn) { - ZPath path = client.modelSpec().path(); - Optional<ZNode<T>> data = cache.currentData(path); - return data.map(node -> { + return internalRead(n -> { if ( storingStatIn != null ) { - DataTree.copyStat(node.stat(), storingStatIn); + DataTree.copyStat(n.stat(), storingStatIn); } - return completed(new ModelStage<>(), node.model()); - }).orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath()))); + return n.model(); + }); + } + + @Override + public AsyncStage<ZNode<T>> readAsZNode() + { + return internalRead(Function.identity()); } @Override @@ -248,4 +253,12 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> executor.execute(() -> stage.completeExceptionally(e)); return stage; } + + private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver) + { + ZPath path = client.modelSpec().path(); + Optional<ZNode<T>> data = cache.currentData(path); + return data.map(node -> completed(new ModelStage<>(), resolver.apply(node))) + .orElseGet(() -> completedExceptionally(new ModelStage<>(), new KeeperException.NoNodeException(path.fullPath()))); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index 353c28a..2de57c1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -31,7 +31,7 @@ import org.apache.curator.x.async.modeled.ModelSpec; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.ModeledCache; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import org.apache.curator.x.async.modeled.cached.ZNode; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.zookeeper.data.Stat; import java.util.AbstractMap; import java.util.Map; http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 66f10de..3bb1b73 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -35,15 +35,18 @@ import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModelSpec; import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -154,41 +157,25 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> @Override public AsyncStage<T> read() { - return read(null); + return internalRead(ZNode::model); } @Override public AsyncStage<T> read(Stat storingStatIn) { - AsyncPathable<AsyncStage<byte[]>> next; - if ( isCompressed() ) - { - next = (storingStatIn != null) ? watchableClient.getData().decompressedStoringStatIn(storingStatIn) : watchableClient.getData().decompressed(); - } - else - { - next = (storingStatIn != null) ? watchableClient.getData().storingStatIn(storingStatIn) : watchableClient.getData(); - } - AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath()); - ModelStage<T> modelStage = new ModelStage<>(asyncStage.event()); - asyncStage.whenComplete((value, e) -> { - if ( e != null ) + return internalRead(n -> { + if ( storingStatIn != null ) { - modelStage.completeExceptionally(e); - } - else - { - try - { - modelStage.complete(modelSpec.serializer().deserialize(value)); - } - catch ( Exception deserializeException ) - { - modelStage.completeExceptionally(deserializeException); - } + DataTree.copyStat(n.stat(), storingStatIn); } + return n.model(); }); - return modelStage; + } + + @Override + public AsyncStage<ZNode<T>> readAsZNode() + { + return internalRead(Function.identity()); } @Override @@ -349,4 +336,31 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> { return modelSpec.createOptions().contains(CreateOption.compress); } + + private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver) + { + Stat stat = new Stat(); + AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat); + AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath()); + ModelStage<U> modelStage = new ModelStage<>(asyncStage.event()); + asyncStage.whenComplete((value, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + try + { + ZNode<T> node = new ZNodeImpl<>(modelSpec.path(), stat, modelSpec.serializer().deserialize(value)); + modelStage.complete(resolver.apply(node)); + } + catch ( Exception deserializeException ) + { + modelStage.completeExceptionally(deserializeException); + } + } + }); + return modelStage; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/c1878392/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java index 20dcaac..85bedf4 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java @@ -18,7 +18,7 @@ */ package org.apache.curator.x.async.modeled.details; -import org.apache.curator.x.async.modeled.cached.ZNode; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.zookeeper.data.Stat; import java.util.Objects;
