http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java new file mode 100644 index 0000000..8acbebb --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/CachedModeledFramework.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.cached; + +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.data.Stat; +import java.io.Closeable; +import java.util.List; + +public interface CachedModeledFramework<T> extends ModeledFramework<T>, Closeable +{ + /** + * Return the cache instance + * + * @return cache + */ + ModeledCache<T> cache(); + + /** + * Returns a view of this instance that uses the CachedModeledFramework's executor + * for all default async completion operations. i.e. when you use, for example, + * {@link java.util.concurrent.CompletionStage#handleAsync(java.util.function.BiFunction)} + * this instance's executor is used instead of <code>ForkJoinPool.commonPool()</code>. + * + * @return view + */ + CachedModeledFramework<T> asyncDefault(); + + /** + * Start the internally created cache + */ + void start(); + + /** + * Close/stop the internally created cache + */ + @Override + void close(); + + /** + * Return the listener container so that you can add/remove listeners + * + * @return listener container + */ + Listenable<ModeledCacheListener<T>> listenable(); + + /** + * Same as {@link org.apache.curator.x.async.modeled.ModeledFramework#childrenAsZNodes()} + * but always reads from cache - i.e. no additional queries to ZooKeeper are made + * + * @return AsyncStage stage + */ + @Override + AsyncStage<List<ZNode<T>>> childrenAsZNodes(); + + /** + * {@inheritDoc} + */ + @Override + CachedModeledFramework<T> child(Object child); + + /** + * {@inheritDoc} + */ + @Override + CachedModeledFramework<T> withPath(ZPath path); + + /** + * Same as {@link #read()} except that if the cache does not have a value + * for this path a direct query is made. + * + * @return AsyncStage + * @see org.apache.curator.x.async.AsyncStage + */ + AsyncStage<T> readThrough(); + + /** + * Same as {@link #read(org.apache.zookeeper.data.Stat)} except that if the cache does not have a value + * for this path a direct query is made. + * + * @param storingStatIn the stat for the new ZNode is stored here + * @return AsyncStage + * @see org.apache.curator.x.async.AsyncStage + */ + AsyncStage<T> readThrough(Stat storingStatIn); + + /** + * Same as {@link #readAsZNode()} except that if the cache does not have a value + * for this path a direct query is made. + * + * + * @return AsyncStage + * @see org.apache.curator.x.async.AsyncStage + */ + AsyncStage<ZNode<T>> readThroughAsZNode(); + + /** + * Return the instances of the base path of this cached framework + * + * @return listing of all models in the base path + */ + AsyncStage<List<T>> list(); +}
http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java new file mode 100644 index 0000000..6677268 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCache.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.cached; + +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.curator.x.async.modeled.ZPath; +import java.util.Map; +import java.util.Optional; + +public interface ModeledCache<T> +{ + /** + * Return the modeled current data for the given path. There are no guarantees of accuracy. This is + * merely the most recent view of the data. If there is no node at the given path, + * {@link java.util.Optional#empty()} is returned. + * + * @param path path to the node to check + * @return data if the node is alive, or empty + */ + Optional<ZNode<T>> currentData(ZPath path); + + /** + * Return the modeled current set of children at the given path, mapped by child name. There are no + * guarantees of accuracy; this is merely the most recent view of the data. + * + * @param path path to the node to check + * @return a possibly-empty map of children if the node is alive + */ + Map<ZPath, ZNode<T>> currentChildren(ZPath path); +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java new file mode 100644 index 0000000..42498c0 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.cached; + +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.data.Stat; +import org.slf4j.LoggerFactory; + +@FunctionalInterface +public interface ModeledCacheListener<T> +{ + enum Type + { + /** + * A child was added to the path + */ + NODE_ADDED, + + /** + * A child's data was changed + */ + NODE_UPDATED, + + /** + * A child was removed from the path + */ + NODE_REMOVED + } + + /** + * The given path was added, updated or removed + * + * @param type action type + * @param path the path + * @param stat the node's stat (previous stat for removal) + * @param model the node's model (previous model for removal) + */ + void accept(Type type, ZPath path, Stat stat, T model); + + /** + * The cache has finished initializing + */ + default void initialized() + { + // NOP + } + + /** + * Called when there is an exception processing a message from the internal cache. This is most + * likely due to a de-serialization problem. + * + * @param e the exception + */ + default void handleException(Exception e) + { + LoggerFactory.getLogger(getClass()).error("Could not process cache message", e); + } + + /** + * Returns a version of this listener that only begins calling + * {@link #accept(org.apache.curator.x.async.modeled.cached.ModeledCacheListener.Type, org.apache.curator.x.async.modeled.ZPath, org.apache.zookeeper.data.Stat, Object)} + * once {@link #initialized()} has been called. i.e. changes that occur as the cache is initializing are not sent + * to the listener + * + * @return wrapped listener + */ + default ModeledCacheListener<T> postInitializedOnly() + { + return new ModeledCacheListener<T>() + { + private volatile boolean isInitialized = false; + + @Override + public void accept(Type type, ZPath path, Stat stat, T model) + { + if ( isInitialized ) + { + ModeledCacheListener.this.accept(type, path, stat, model); + } + } + + @Override + public void initialized() + { + isInitialized = true; + ModeledCacheListener.this.initialized(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java new file mode 100644 index 0000000..2a7fd5f --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -0,0 +1,342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.cached.ModeledCache; +import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; +import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> +{ + private final ModeledFramework<T> client; + private final ModeledCacheImpl<T> cache; + private final Executor executor; + private final boolean asyncDefaultMode; + + CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor) + { + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor, false); + } + + private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor, boolean asyncDefaultMode) + { + this.client = client; + this.cache = cache; + this.executor = executor; + this.asyncDefaultMode = asyncDefaultMode; + } + + @Override + public ModeledCache<T> cache() + { + return cache; + } + + @Override + public CachedModeledFramework<T> asyncDefault() + { + return new CachedModeledFrameworkImpl<>(client, cache, executor, true); + } + + @Override + public void start() + { + cache.start(); + } + + @Override + public void close() + { + cache.close(); + } + + @Override + public Listenable<ModeledCacheListener<T>> listenable() + { + return cache.listenable(); + } + + @Override + public CachedModeledFramework<T> cached() + { + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public CachedModeledFramework<T> cached(ExecutorService executor) + { + throw new UnsupportedOperationException("Already a cached instance"); + } + + @Override + public VersionedModeledFramework<T> versioned() + { + return new VersionedModeledFrameworkImpl<>(this); + } + + @Override + public AsyncCuratorFramework unwrap() + { + return client.unwrap(); + } + + @Override + public ModelSpec<T> modelSpec() + { + return client.modelSpec(); + } + + @Override + public CachedModeledFramework<T> child(Object child) + { + return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor, asyncDefaultMode); + } + + @Override + public ModeledFramework<T> parent() + { + throw new UnsupportedOperationException("Not supported for CachedModeledFramework. Instead, call parent() on the ModeledFramework before calling cached()"); + } + + @Override + public CachedModeledFramework<T> withPath(ZPath path) + { + return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor, asyncDefaultMode); + } + + @Override + public AsyncStage<String> set(T model) + { + return client.set(model); + } + + @Override + public AsyncStage<String> set(T model, Stat storingStatIn) + { + return client.set(model, storingStatIn); + } + + @Override + public AsyncStage<String> set(T model, Stat storingStatIn, int version) + { + return client.set(model, storingStatIn, version); + } + + @Override + public AsyncStage<String> set(T model, int version) + { + return client.set(model, version); + } + + @Override + public AsyncStage<T> read() + { + return internalRead(ZNode::model, this::exceptionally); + } + + @Override + public AsyncStage<T> read(Stat storingStatIn) + { + return internalRead(n -> { + if ( storingStatIn != null ) + { + DataTree.copyStat(n.stat(), storingStatIn); + } + return n.model(); + }, this::exceptionally); + } + + @Override + public AsyncStage<ZNode<T>> readAsZNode() + { + return internalRead(Function.identity(), this::exceptionally); + } + + @Override + public AsyncStage<T> readThrough() + { + return internalRead(ZNode::model, client::read); + } + + @Override + public AsyncStage<T> readThrough(Stat storingStatIn) + { + return internalRead(ZNode::model, () -> client.read(storingStatIn)); + } + + @Override + public AsyncStage<ZNode<T>> readThroughAsZNode() + { + return internalRead(Function.identity(), client::readAsZNode); + } + + @Override + public AsyncStage<List<T>> list() + { + List<T> children = cache.currentChildren() + .values() + .stream() + .map(ZNode::model) + .collect(Collectors.toList()); + return asyncDefaultMode ? ModelStage.asyncCompleted(children, executor) : ModelStage.completed(children); + } + + @Override + public AsyncStage<Stat> update(T model) + { + return client.update(model); + } + + @Override + public AsyncStage<Stat> update(T model, int version) + { + return client.update(model, version); + } + + @Override + public AsyncStage<Void> delete() + { + return client.delete(); + } + + @Override + public AsyncStage<Void> delete(int version) + { + return client.delete(version); + } + + @Override + public AsyncStage<Stat> checkExists() + { + ZPath path = client.modelSpec().path(); + Optional<ZNode<T>> data = cache.currentData(path); + return data.map(node -> completed(node.stat())).orElseGet(() -> completed(null)); + } + + @Override + public AsyncStage<List<ZPath>> children() + { + List<ZPath> paths = cache.currentChildren(client.modelSpec().path()) + .keySet() + .stream() + .filter(path -> path.equals(cache.basePath())) + .collect(Collectors.toList()); + return completed(paths); + } + + @Override + public AsyncStage<List<ZNode<T>>> childrenAsZNodes() + { + List<ZNode<T>> nodes = cache.currentChildren(client.modelSpec().path()) + .entrySet() + .stream() + .filter(e -> e.getKey().equals(cache.basePath())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + return completed(nodes); + } + + @Override + public CuratorOp createOp(T model) + { + return client.createOp(model); + } + + @Override + public CuratorOp updateOp(T model) + { + return client.updateOp(model); + } + + @Override + public CuratorOp updateOp(T model, int version) + { + return client.updateOp(model, version); + } + + @Override + public CuratorOp deleteOp() + { + return client.deleteOp(); + } + + @Override + public CuratorOp deleteOp(int version) + { + return client.deleteOp(version); + } + + @Override + public CuratorOp checkExistsOp() + { + return client.checkExistsOp(); + } + + @Override + public CuratorOp checkExistsOp(int version) + { + return client.checkExistsOp(version); + } + + @Override + public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations) + { + return client.inTransaction(operations); + } + + private <U> AsyncStage<U> completed(U value) + { + return asyncDefaultMode ? ModelStage.asyncCompleted(value, executor) : ModelStage.completed(value); + } + + private <U> AsyncStage<U> exceptionally() + { + KeeperException.NoNodeException exception = new KeeperException.NoNodeException(client.modelSpec().path().fullPath()); + return asyncDefaultMode ? ModelStage.asyncExceptionally(exception, executor) : ModelStage.exceptionally(exception); + } + + private <U> AsyncStage<U> internalRead(Function<ZNode<T>, U> resolver, Supplier<AsyncStage<U>> elseProc) + { + ZPath path = client.modelSpec().path(); + Optional<ZNode<T>> data = cache.currentData(path); + return data.map(node -> completed(resolver.apply(node))) + .orElseGet(elseProc); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java new file mode 100644 index 0000000..58405eb --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.curator.framework.schema.Schema; +import org.apache.curator.framework.schema.SchemaValidator; +import org.apache.curator.framework.schema.SchemaViolation; +import org.apache.curator.x.async.api.CreateOption; +import org.apache.curator.x.async.api.DeleteOption; +import org.apache.curator.x.async.modeled.ModelSerializer; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator +{ + private final ZPath path; + private final ModelSerializer<T> serializer; + private final CreateMode createMode; + private final List<ACL> aclList; + private final Set<CreateOption> createOptions; + private final Set<DeleteOption> deleteOptions; + private final long ttl; + private volatile Schema schema = null; + + public ModelSpecImpl(ZPath path, ModelSerializer<T> serializer, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions, long ttl) + { + this.path = Objects.requireNonNull(path, "path cannot be null"); + this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null"); + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + this.aclList = ImmutableList.copyOf(Objects.requireNonNull(aclList, "aclList cannot be null")); + this.createOptions = ImmutableSet.copyOf(Objects.requireNonNull(createOptions, "createOptions cannot be null")); + this.deleteOptions = ImmutableSet.copyOf(Objects.requireNonNull(deleteOptions, "deleteOptions cannot be null")); + this.ttl = ttl; + } + + @Override + public ModelSpec<T> child(Object child) + { + return withPath(path.child(child)); + } + + @Override + public ModelSpec<T> parent() + { + return withPath(path.parent()); + } + + @Override + public ModelSpec<T> resolved(Object... parameters) + { + return withPath(path.resolved(parameters)); + } + + @Override + public ModelSpec<T> resolved(List<Object> parameters) + { + return withPath(path.resolved(parameters)); + } + + @Override + public ModelSpec<T> withPath(ZPath newPath) + { + return new ModelSpecImpl<>(newPath, serializer, createMode, aclList, createOptions, deleteOptions, ttl); + } + + @Override + public ZPath path() + { + return path; + } + + @Override + public ModelSerializer<T> serializer() + { + return serializer; + } + + @Override + public CreateMode createMode() + { + return createMode; + } + + @Override + public List<ACL> aclList() + { + return aclList; + } + + @Override + public Set<CreateOption> createOptions() + { + return createOptions; + } + + @Override + public Set<DeleteOption> deleteOptions() + { + return deleteOptions; + } + + @Override + public long ttl() + { + return ttl; + } + + @Override + public Schema schema() + { + if ( schema == null ) + { + schema = Schema.builder(path.toSchemaPathPattern()) + .dataValidator(this) + .ephemeral(createMode.isEphemeral() ? Schema.Allowance.MUST : Schema.Allowance.CANNOT) + .canBeDeleted(true) + .sequential(createMode.isSequential() ? Schema.Allowance.MUST : Schema.Allowance.CANNOT) + .watched(Schema.Allowance.CAN) + .build(); + } + return schema; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + ModelSpecImpl<?> modelSpec = (ModelSpecImpl<?>)o; + + if ( ttl != modelSpec.ttl ) + { + return false; + } + if ( !path.equals(modelSpec.path) ) + { + return false; + } + if ( !serializer.equals(modelSpec.serializer) ) + { + return false; + } + if ( createMode != modelSpec.createMode ) + { + return false; + } + if ( !aclList.equals(modelSpec.aclList) ) + { + return false; + } + if ( !createOptions.equals(modelSpec.createOptions) ) + { + return false; + } + //noinspection SimplifiableIfStatement + if ( !deleteOptions.equals(modelSpec.deleteOptions) ) + { + return false; + } + return schema.equals(modelSpec.schema); + } + + @Override + public int hashCode() + { + int result = path.hashCode(); + result = 31 * result + serializer.hashCode(); + result = 31 * result + createMode.hashCode(); + result = 31 * result + aclList.hashCode(); + result = 31 * result + createOptions.hashCode(); + result = 31 * result + deleteOptions.hashCode(); + result = 31 * result + (int)(ttl ^ (ttl >>> 32)); + result = 31 * result + schema.hashCode(); + return result; + } + + @Override + public String toString() + { + return "ModelSpecImpl{" + "path=" + path + ", serializer=" + serializer + ", createMode=" + createMode + ", aclList=" + aclList + ", createOptions=" + createOptions + ", deleteOptions=" + deleteOptions + ", ttl=" + ttl + ", schema=" + schema + '}'; + } + + @Override + public boolean isValid(Schema schema, String path, byte[] data, List<ACL> acl) + { + if ( acl != null ) + { + List<ACL> localAclList = (aclList.size() > 0) ? aclList : ZooDefs.Ids.OPEN_ACL_UNSAFE; + if ( !acl.equals(localAclList) ) + { + throw new SchemaViolation(schema, new SchemaViolation.ViolatorData(path, data, acl), "ACLs do not match model ACLs"); + } + } + + if ( data != null ) + { + try + { + serializer.deserialize(data); + } + catch ( RuntimeException e ) + { + throw new SchemaViolation(schema, new SchemaViolation.ViolatorData(path, data, acl), "Data cannot be deserialized into a model"); + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java new file mode 100644 index 0000000..27047ec --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelStage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.x.async.AsyncStage; +import org.apache.zookeeper.WatchedEvent; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +class ModelStage<T> extends CompletableFuture<T> implements AsyncStage<T> +{ + private final CompletionStage<WatchedEvent> event; + + static <U> ModelStage<U> make() + { + return new ModelStage<>(null); + } + + static <U> ModelStage<U> make(CompletionStage<WatchedEvent> event) + { + return new ModelStage<>(event); + } + + static <U> ModelStage<U> completed(U value) + { + ModelStage<U> stage = new ModelStage<>(null); + stage.complete(value); + return stage; + } + + static <U> ModelStage<U> exceptionally(Exception e) + { + ModelStage<U> stage = new ModelStage<>(null); + stage.completeExceptionally(e); + return stage; + } + + static <U> ModelStage<U> async(Executor executor) + { + return new AsyncModelStage<>(executor); + } + + static <U> ModelStage<U> asyncCompleted(U value, Executor executor) + { + ModelStage<U> stage = new AsyncModelStage<>(executor); + stage.complete(value); + return stage; + } + + static <U> ModelStage<U> asyncExceptionally(Exception e, Executor executor) + { + ModelStage<U> stage = new AsyncModelStage<>(executor); + stage.completeExceptionally(e); + return stage; + } + + @Override + public CompletionStage<WatchedEvent> event() + { + return event; + } + + private ModelStage(CompletionStage<WatchedEvent> event) + { + this.event = event; + } + + private static class AsyncModelStage<U> extends ModelStage<U> + { + private final Executor executor; + + public AsyncModelStage(Executor executor) + { + super(null); + this.executor = executor; + } + + @Override + public <U1> CompletableFuture<U1> thenApplyAsync(Function<? super U, ? extends U1> fn) + { + return super.thenApplyAsync(fn, executor); + } + + @Override + public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> action) + { + return super.thenAcceptAsync(action, executor); + } + + @Override + public CompletableFuture<Void> thenRunAsync(Runnable action) + { + return super.thenRunAsync(action, executor); + } + + @Override + public <U1, V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U1> other, BiFunction<? super U, ? super U1, ? extends V> fn) + { + return super.thenCombineAsync(other, fn, executor); + } + + @Override + public <U1> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U1> other, BiConsumer<? super U, ? super U1> action) + { + return super.thenAcceptBothAsync(other, action, executor); + } + + @Override + public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) + { + return super.runAfterBothAsync(other, action, executor); + } + + @Override + public <U1> CompletableFuture<U1> applyToEitherAsync(CompletionStage<? extends U> other, Function<? super U, U1> fn) + { + return super.applyToEitherAsync(other, fn, executor); + } + + @Override + public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends U> other, Consumer<? super U> action) + { + return super.acceptEitherAsync(other, action, executor); + } + + @Override + public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) + { + return super.runAfterEitherAsync(other, action, executor); + } + + @Override + public <U1> CompletableFuture<U1> thenComposeAsync(Function<? super U, ? extends CompletionStage<U1>> fn) + { + return super.thenComposeAsync(fn, executor); + } + + @Override + public CompletableFuture<U> whenCompleteAsync(BiConsumer<? super U, ? super Throwable> action) + { + return super.whenCompleteAsync(action, executor); + } + + @Override + public <U1> CompletableFuture<U1> handleAsync(BiFunction<? super U, Throwable, ? extends U1> fn) + { + return super.handleAsync(fn, executor); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java new file mode 100644 index 0000000..72e6762 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.x.async.api.CreateOption; +import org.apache.curator.x.async.modeled.ModelSerializer; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.cached.ModeledCache; +import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.zookeeper.data.Stat; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> +{ + private final TreeCache cache; + private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>(); + private final ModelSerializer<T> serializer; + private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>(); + private final ZPath basePath; + + private static final class Entry<T> + { + final Stat stat; + final T model; + + Entry(Stat stat, T model) + { + this.stat = stat; + this.model = model; + } + } + + ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor) + { + if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() ) + { + modelSpec = modelSpec.parent(); // i.e. the last item is a parameter + } + + basePath = modelSpec.path(); + this.serializer = modelSpec.serializer(); + cache = TreeCache.newBuilder(client, basePath.fullPath()) + .setCacheData(false) + .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) + .setExecutor(executor) + .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) + .build(); + } + + public void start() + { + try + { + cache.getListenable().addListener(this); + cache.start(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + public void close() + { + cache.getListenable().removeListener(this); + cache.close(); + entries.clear(); + } + + @Override + public Optional<ZNode<T>> currentData(ZPath path) + { + Entry<T> entry = entries.remove(path); + if ( entry != null ) + { + return Optional.of(new ZNodeImpl<>(path, entry.stat, entry.model)); + } + return Optional.empty(); + } + + ZPath basePath() + { + return basePath; + } + + Map<ZPath, ZNode<T>> currentChildren() + { + return currentChildren(basePath); + } + + @Override + public Map<ZPath, ZNode<T>> currentChildren(ZPath path) + { + return entries.entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(path)) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new ZNodeImpl<>(entry.getKey(), entry.getValue().stat, entry.getValue().model))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Listenable<ModeledCacheListener<T>> listenable() + { + return listenerContainer; + } + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) + { + try + { + internalChildEvent(event); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + + listenerContainer.forEach(l -> { + l.handleException(e); + return null; + }); + } + } + + private void internalChildEvent(TreeCacheEvent event) throws Exception + { + switch ( event.getType() ) + { + case NODE_ADDED: + case NODE_UPDATED: + { + ZPath path = ZPath.parse(event.getData().getPath()); + if ( !path.equals(basePath) ) + { + byte[] bytes = event.getData().getData(); + if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created + { + T model = serializer.deserialize(bytes); + entries.put(path, new Entry<>(event.getData().getStat(), model)); + ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; + accept(type, path, event.getData().getStat(), model); + } + } + break; + } + + case NODE_REMOVED: + { + ZPath path = ZPath.parse(event.getData().getPath()); + if ( !path.equals(basePath) ) + { + Entry<T> entry = entries.remove(path); + T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); + Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); + accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model); + } + break; + } + + case INITIALIZED: + { + listenerContainer.forEach(l -> { + l.initialized(); + return null; + }); + break; + } + + default: + // ignore + break; + } + } + + private void accept(ModeledCacheListener.Type type, ZPath path, Stat stat, T model) + { + listenerContainer.forEach(l -> { + l.accept(type, path, stat, model); + return null; + }); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java new file mode 100644 index 0000000..c1d19c4 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.WatchMode; +import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl; +import org.apache.curator.x.async.api.AsyncPathAndBytesable; +import org.apache.curator.x.async.api.AsyncPathable; +import org.apache.curator.x.async.api.AsyncTransactionSetDataBuilder; +import org.apache.curator.x.async.api.CreateOption; +import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +public class ModeledFrameworkImpl<T> implements ModeledFramework<T> +{ + private final AsyncCuratorFramework client; + private final WatchableAsyncCuratorFramework watchableClient; + private final ModelSpec<T> modelSpec; + private final WatchMode watchMode; + private final UnaryOperator<WatchedEvent> watcherFilter; + private final UnhandledErrorListener unhandledErrorListener; + private final UnaryOperator<CuratorEvent> resultFilter; + private final AsyncCuratorFrameworkDsl dslClient; + private final boolean isWatched; + + public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter) + { + boolean isWatched = (watchMode != null); + + Objects.requireNonNull(client, "client cannot be null"); + Objects.requireNonNull(model, "model cannot be null"); + + watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; + + AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter); + WatchableAsyncCuratorFramework watchableClient = isWatched ? dslClient.watched() : dslClient; + + return new ModeledFrameworkImpl<>( + client, + dslClient, + watchableClient, + model, + watchMode, + watcherFilter, + unhandledErrorListener, + resultFilter, + isWatched + ); + } + + private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched) + { + this.client = client; + this.dslClient = dslClient; + this.watchableClient = watchableClient; + this.modelSpec = modelSpec; + this.watchMode = watchMode; + this.watcherFilter = watcherFilter; + this.unhandledErrorListener = unhandledErrorListener; + this.resultFilter = resultFilter; + this.isWatched = isWatched; + } + + @Override + public CachedModeledFramework<T> cached() + { + return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework")); + } + + @Override + public CachedModeledFramework<T> cached(ExecutorService executor) + { + Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers."); + return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null")); + } + + @Override + public VersionedModeledFramework<T> versioned() + { + return new VersionedModeledFrameworkImpl<>(this); + } + + @Override + public ModelSpec<T> modelSpec() + { + return modelSpec; + } + + @Override + public AsyncCuratorFramework unwrap() + { + return client; + } + + @Override + public AsyncStage<String> set(T item) + { + return set(item, null, -1); + } + + @Override + public AsyncStage<String> set(T item, Stat storingStatIn) + { + return set(item, storingStatIn, -1); + } + + @Override + public AsyncStage<String> set(T item, int version) + { + return set(item, null, -1); + } + + @Override + public AsyncStage<String> set(T item, Stat storingStatIn, int version) + { + try + { + byte[] bytes = modelSpec.serializer().serialize(item); + return dslClient.create() + .withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl(), version) + .forPath(resolveForSet(item), bytes); + } + catch ( Exception e ) + { + return ModelStage.exceptionally(e); + } + } + + @Override + public AsyncStage<T> read() + { + return internalRead(ZNode::model, null); + } + + @Override + public AsyncStage<T> read(Stat storingStatIn) + { + return internalRead(ZNode::model, storingStatIn); + } + + @Override + public AsyncStage<ZNode<T>> readAsZNode() + { + return internalRead(Function.identity(), null); + } + + @Override + public AsyncStage<Stat> update(T item) + { + return update(item, -1); + } + + @Override + public AsyncStage<Stat> update(T item, int version) + { + try + { + byte[] bytes = modelSpec.serializer().serialize(item); + AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData(); + return next.forPath(resolveForSet(item), bytes); + } + catch ( Exception e ) + { + return ModelStage.exceptionally(e); + } + } + + @Override + public AsyncStage<Stat> checkExists() + { + return watchableClient.checkExists().forPath(modelSpec.path().fullPath()); + } + + @Override + public AsyncStage<Void> delete() + { + return delete(-1); + } + + @Override + public AsyncStage<Void> delete(int version) + { + return dslClient.delete().withVersion(-1).forPath(modelSpec.path().fullPath()); + } + + @Override + public AsyncStage<List<ZPath>> children() + { + return internalGetChildren(modelSpec.path()); + } + + @Override + public AsyncStage<List<ZNode<T>>> childrenAsZNodes() + { + ModelStage<List<ZNode<T>>> modelStage = ModelStage.make(); + Preconditions.checkState(!isWatched, "childrenAsZNodes() cannot be used with watched instances."); + children().handle((children, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + completeChildrenAsZNodes(modelStage, children); + } + return null; + }); + return modelStage; + } + + private void completeChildrenAsZNodes(ModelStage<List<ZNode<T>>> modelStage, List<ZPath> children) + { + List<ZNode<T>> nodes = Lists.newArrayList(); + if ( children.size() == 0 ) + { + modelStage.complete(nodes); + return; + } + children.forEach(path -> withPath(path).readAsZNode().handle((node, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + nodes.add(node); + if ( nodes.size() == children.size() ) + { + modelStage.complete(nodes); + } + } + return null; + })); + } + + private AsyncStage<List<ZPath>> internalGetChildren(ZPath path) + { + AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(path.fullPath()); + ModelStage<List<ZPath>> modelStage = ModelStage.make(asyncStage.event()); + asyncStage.whenComplete((children, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + modelStage.complete(children.stream().map(path::child).collect(Collectors.toList())); + } + }); + return modelStage; + } + + @Override + public ModeledFramework<T> parent() + { + ModelSpec<T> newModelSpec = modelSpec.parent(); + return new ModeledFrameworkImpl<>( + client, + dslClient, + watchableClient, + newModelSpec, + watchMode, + watcherFilter, + unhandledErrorListener, + resultFilter, + isWatched + ); + } + + @Override + public ModeledFramework<T> child(Object child) + { + ModelSpec<T> newModelSpec = modelSpec.child(child); + return new ModeledFrameworkImpl<>( + client, + dslClient, + watchableClient, + newModelSpec, + watchMode, + watcherFilter, + unhandledErrorListener, + resultFilter, + isWatched + ); + } + + @Override + public ModeledFramework<T> withPath(ZPath path) + { + ModelSpec<T> newModelSpec = modelSpec.withPath(path); + return new ModeledFrameworkImpl<>( + client, + dslClient, + watchableClient, + newModelSpec, + watchMode, + watcherFilter, + unhandledErrorListener, + resultFilter, + isWatched + ); + } + + public static boolean isCompressed(Set<CreateOption> createOptions) + { + return createOptions.contains(CreateOption.compress); + } + + @Override + public CuratorOp createOp(T model) + { + return client.transactionOp() + .create() + .withOptions(modelSpec.createMode(), fixAclList(modelSpec.aclList()), modelSpec.createOptions().contains(CreateOption.compress), modelSpec.ttl()) + .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + } + + @Override + public CuratorOp updateOp(T model) + { + return updateOp(model, -1); + } + + @Override + public CuratorOp updateOp(T model, int version) + { + AsyncTransactionSetDataBuilder builder = client.transactionOp().setData(); + if ( isCompressed() ) + { + return builder.withVersionCompressed(version).forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + } + return builder.withVersion(version).forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + } + + @Override + public CuratorOp deleteOp() + { + return deleteOp(-1); + } + + @Override + public CuratorOp deleteOp(int version) + { + return client.transactionOp().delete().withVersion(version).forPath(modelSpec.path().fullPath()); + } + + @Override + public CuratorOp checkExistsOp() + { + return checkExistsOp(-1); + } + + @Override + public CuratorOp checkExistsOp(int version) + { + return client.transactionOp().check().withVersion(version).forPath(modelSpec.path().fullPath()); + } + + @Override + public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations) + { + return client.transaction().forOperations(operations); + } + + private boolean isCompressed() + { + return modelSpec.createOptions().contains(CreateOption.compress); + } + + private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver, Stat storingStatIn) + { + Stat stat = (storingStatIn != null) ? storingStatIn : new Stat(); + AsyncPathable<AsyncStage<byte[]>> next = isCompressed() ? watchableClient.getData().decompressedStoringStatIn(stat) : watchableClient.getData().storingStatIn(stat); + AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath()); + ModelStage<U> modelStage = ModelStage.make(asyncStage.event()); + asyncStage.whenComplete((value, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + try + { + ZNode<T> node = new ZNodeImpl<>(modelSpec.path(), stat, modelSpec.serializer().deserialize(value)); + modelStage.complete(resolver.apply(node)); + } + catch ( Exception deserializeException ) + { + modelStage.completeExceptionally(deserializeException); + } + } + }); + return modelStage; + } + + private String resolveForSet(T model) + { + if ( modelSpec.path().isResolved() ) + { + return modelSpec.path().fullPath(); + } + return modelSpec.path().resolved(model).fullPath(); + } + + private List<ACL> fixAclList(List<ACL> aclList) + { + return (aclList.size() > 0) ? aclList : null; // workaround for old, bad design. empty list not accepted + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java new file mode 100644 index 0000000..89d7615 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/VersionedModeledFrameworkImpl.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.versioned.Versioned; +import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; +import org.apache.zookeeper.data.Stat; + +class VersionedModeledFrameworkImpl<T> implements VersionedModeledFramework<T> +{ + private final ModeledFramework<T> client; + + VersionedModeledFrameworkImpl(ModeledFramework<T> client) + { + this.client = client; + } + + @Override + public AsyncStage<String> set(Versioned<T> model) + { + return client.set(model.model(), model.version()); + } + + @Override + public AsyncStage<String> set(Versioned<T> model, Stat storingStatIn) + { + return client.set(model.model(), storingStatIn, model.version()); + } + + @Override + public AsyncStage<Versioned<T>> read() + { + return read(null); + } + + @Override + public AsyncStage<Versioned<T>> read(Stat storingStatIn) + { + Stat localStat = (storingStatIn != null) ? storingStatIn : new Stat(); + AsyncStage<T> stage = client.read(localStat); + ModelStage<Versioned<T>> modelStage = ModelStage.make(stage.event()); + stage.whenComplete((model, e) -> { + if ( e != null ) + { + modelStage.completeExceptionally(e); + } + else + { + modelStage.complete(Versioned.from(model, localStat.getVersion())); + } + }); + return modelStage; + } + + @Override + public AsyncStage<Stat> update(Versioned<T> model) + { + return client.update(model.model(), model.version()); + } + + @Override + public CuratorOp updateOp(Versioned<T> model) + { + return client.updateOp(model.model(), model.version()); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java new file mode 100644 index 0000000..85bedf4 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZNodeImpl.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import org.apache.curator.x.async.modeled.ZNode; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.data.Stat; +import java.util.Objects; + +public class ZNodeImpl<T> implements ZNode<T> +{ + private final ZPath path; + private final Stat stat; + private final T model; + + public ZNodeImpl(ZPath path, Stat stat, T model) + { + this.path = Objects.requireNonNull(path, "path cannot be null"); + this.stat = Objects.requireNonNull(stat, "stat cannot be null"); + this.model = Objects.requireNonNull(model, "model cannot be null"); + } + + @Override + public ZPath path() + { + return path; + } + + @Override + public Stat stat() + { + return stat; + } + + @Override + public T model() + { + return model; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java new file mode 100644 index 0000000..fff742e --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.details; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import org.apache.curator.x.async.modeled.NodeName; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.common.PathUtils; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.UnaryOperator; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.curator.utils.ZKPaths.PATH_SEPARATOR; + +public class ZPathImpl implements ZPath +{ + public static final ZPath root = new ZPathImpl(Collections.singletonList(PATH_SEPARATOR), null); + + private final List<String> nodes; + private final boolean isResolved; + private volatile String fullPath = null; + private volatile ZPath parent = null; + private volatile Pattern schema = null; + + public static ZPath parse(String fullPath, UnaryOperator<String> nameFilter) + { + return parseInternal(fullPath, nameFilter); + } + + private static ZPathImpl parseInternal(String fullPath, UnaryOperator<String> nameFilter) + { + List<String> nodes = ImmutableList.<String>builder() + .add(PATH_SEPARATOR) + .addAll( + Splitter.on(PATH_SEPARATOR) + .omitEmptyStrings() + .splitToList(fullPath) + .stream() + .map(nameFilter) + .collect(Collectors.toList()) + ) + .build(); + nodes.forEach(ZPathImpl::validate); + return new ZPathImpl(nodes, null); + } + + public static ZPath from(String[] names) + { + return from(null, Arrays.asList(names)); + } + + public static ZPath from(List<String> names) + { + return from(null, names); + } + + public static ZPath from(ZPath base, String[] names) + { + return from(base, Arrays.asList(names)); + } + + public static ZPath from(ZPath base, List<String> names) + { + names = Objects.requireNonNull(names, "names cannot be null"); + names.forEach(ZPathImpl::validate); + ImmutableList.Builder<String> builder = ImmutableList.builder(); + if ( base != null ) + { + if ( base instanceof ZPathImpl ) + { + builder.addAll(((ZPathImpl)base).nodes); + } + else + { + builder.addAll(Splitter.on(PATH_SEPARATOR).omitEmptyStrings().splitToList(base.fullPath())); + } + } + else + { + builder.add(PATH_SEPARATOR); + } + List<String> nodes = builder.addAll(names).build(); + return new ZPathImpl(nodes, null); + } + + @Override + public ZPath child(Object child) + { + return new ZPathImpl(nodes, NodeName.nameFrom(child)); + } + + @Override + public ZPath parent() + { + checkRootAccess(); + if ( parent == null ) + { + parent = new ZPathImpl(nodes.subList(0, nodes.size() - 1), null); + } + return parent; + } + + @Override + public boolean isRoot() + { + return nodes.size() == 1; + } + + @Override + public boolean startsWith(ZPath path) + { + ZPathImpl rhs; + if ( path instanceof ZPathImpl ) + { + rhs = (ZPathImpl)path; + } + else + { + rhs = parseInternal(path.fullPath(), s -> s); + } + return (nodes.size() >= rhs.nodes.size()) && nodes.subList(0, rhs.nodes.size()).equals(rhs.nodes); + } + + @Override + public Pattern toSchemaPathPattern() + { + if ( schema == null ) + { + schema = Pattern.compile(buildFullPath(s -> isParameter(s) ? ".*" : s)); + } + return schema; + } + + @Override + public String fullPath() + { + checkResolved(); + if ( fullPath == null ) + { + fullPath = buildFullPath(s -> s); + } + return fullPath; + } + + @Override + public String nodeName() + { + return nodes.get(nodes.size() - 1); + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + ZPathImpl zPaths = (ZPathImpl)o; + + return nodes.equals(zPaths.nodes); + } + + @Override + public int hashCode() + { + return nodes.hashCode(); + } + + @Override + public String toString() + { + return nodes.subList(1, nodes.size()) + .stream().map(name -> isParameter(name) ? name.substring(1) : name) + .collect(Collectors.joining(PATH_SEPARATOR, PATH_SEPARATOR, "")); + } + + @Override + public ZPath resolved(List<Object> parameters) + { + Iterator<Object> iterator = parameters.iterator(); + List<String> nodeNames = nodes.stream() + .map(name -> { + if ( isParameter(name) && iterator.hasNext() ) + { + return NodeName.nameFrom(iterator.next()); + } + return name; + }) + .collect(Collectors.toList()); + return new ZPathImpl(nodeNames, null); + } + + @Override + public boolean isResolved() + { + return isResolved; + } + + private static boolean isParameter(String name) + { + return (name.length() > 1) && name.startsWith(PATH_SEPARATOR); + } + + private ZPathImpl(List<String> nodes, String child) + { + ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(nodes); + if ( child != null ) + { + validate(child); + builder.add(child); + } + this.nodes = builder.build(); + isResolved = this.nodes.stream().noneMatch(ZPathImpl::isParameter); + } + + private void checkRootAccess() + { + if ( isRoot() ) + { + throw new NoSuchElementException("The root has no parent"); + } + } + + private void checkResolved() + { + if ( !isResolved) + { + throw new IllegalStateException("This ZPath has not been resolved: " + toString()); + } + } + + private static void validate(String nodeName) + { + if ( isParameter(Objects.requireNonNull(nodeName, "nodeName cannot be null")) ) + { + return; + } + if ( nodeName.equals(PATH_SEPARATOR) ) + { + return; + } + PathUtils.validatePath(PATH_SEPARATOR + nodeName); + } + + private String buildFullPath(UnaryOperator<String> filter) + { + boolean addSeparator = false; + StringBuilder str = new StringBuilder(); + int size = nodes.size(); + int parameterIndex = 0; + for ( int i = 0; i < size; ++i ) + { + if ( i > 1 ) + { + str.append(PATH_SEPARATOR); + } + str.append(filter.apply(nodes.get(i))); + } + return str.toString(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java new file mode 100644 index 0000000..3fa9831 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.typed; + +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModelSpecBuilder; + +/** + * <p> + * Abstraction that allows the construction of ModelSpecs using strongly typed parameter replacements. + * For example, given a ModelSpec with a path such as "/root/registry/people/{id}" where "id" should + * be <code>PersonId</code>. + * </p> + * + * <p> + * <pre><code> + * // Step 1. Create a typed ZPath + * TypedZPath<PersonId> typedPath = TypedZPath.from("/root/registry/people/{id}"); + * + * // Step 2. Create a ModelSpec builder (do not build at this point) + * ModelSpecBuilder<Person> builder = ModelSpec.builder(JacksonModelSerializer.build(Person.class)) + * + * // Step 3. Create a typed ModelSpec using the typed ZPath and ModelSpec builder + * TypedModelSpec<Person, PersonId> typedModelSpec = TypedModelSpec.from(builder, path); + * + * // later on the TypedModelSpec can be resolved into a useable ModelSpec + * ModelSpec<Person> modelSpec = typedModelSpec.resolve(personId); + * </pre></code> + * </p> + */ +@FunctionalInterface +public interface TypedModelSpec<M, P1> +{ + /** + * Resolve into a ZPath using the given parameter + * + * @param p1 the parameter + * @return ZPath + */ + ModelSpec<M> resolved(P1 p1); + + /** + * Return a new TypedModelSpec using the given model spec builder and typed path. When + * {@link #resolved(Object)} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param path typed path + * @return new TypedModelSpec + */ + static <M, P1> TypedModelSpec<M, P1> from(ModelSpecBuilder<M> builder, TypedZPath<P1> path) + { + return p1 -> builder.withPath(path.resolved(p1)).build(); + } + + /** + * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath + * is created from the given full path and When + * {@link #resolved(Object)} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param pathWithIds typed path + * @return new TypedModelSpec + */ + static <M, P1> TypedModelSpec<M, P1> from(ModelSpecBuilder<M> builder, String pathWithIds) + { + TypedZPath<P1> zPath = TypedZPath.from(pathWithIds); + return p1 -> builder.withPath(zPath.resolved(p1)).build(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java new file mode 100644 index 0000000..dee3506 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec0.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.typed; + +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModelSpecBuilder; + +/** + * Same as {@link TypedModelSpec}, but with 0 parameters + */ +@FunctionalInterface +public interface TypedModelSpec0<M> +{ + ModelSpec<M> resolved(); + + /** + * Return a new TypedModelSpec using the given model spec builder and typed path. When + * {@link #resolved()} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param path typed path + * @return new TypedModelSpec + */ + static <M> TypedModelSpec0<M> from(ModelSpecBuilder<M> builder, TypedZPath0 path) + { + return () -> builder.withPath(path.resolved()).build(); + } + + /** + * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath + * is created from the given full path and When + * {@link #resolved()} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param pathWithIds typed path + * @return new TypedModelSpec + */ + static <M> TypedModelSpec0<M> from(ModelSpecBuilder<M> builder, String pathWithIds) + { + TypedZPath0 zPath = TypedZPath0.from(pathWithIds); + return () -> builder.withPath(zPath.resolved()).build(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java new file mode 100644 index 0000000..1b00d66 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec10.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.typed; + +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModelSpecBuilder; + +/** + * Same as {@link org.apache.curator.x.async.modeled.typed.TypedModelSpec}, but with 10 parameters + */ +@FunctionalInterface +public interface TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> +{ + ModelSpec<M> resolved(P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9, P10 p10); + + /** + * Return a new TypedModelSpec using the given model spec builder and typed path. When + * {@link #resolved(Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param path typed path + * @return new TypedModelSpec + */ + static <M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> from(ModelSpecBuilder<M> builder, TypedZPath10<P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> path) + { + return (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10) -> builder.withPath(path.resolved(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10)).build(); + } + + /** + * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath + * is created from the given full path and When + * {@link #resolved(Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param pathWithIds typed path + * @return new TypedModelSpec + */ + static <M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> TypedModelSpec10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> from(ModelSpecBuilder<M> builder, String pathWithIds) + { + TypedZPath10<P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> zPath = TypedZPath10.from(pathWithIds); + return (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10) -> builder.withPath(zPath.resolved(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10)).build(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/0f5d10da/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java new file mode 100644 index 0000000..a56e139 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModelSpec2.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.typed; + +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModelSpecBuilder; + +/** + * Same as {@link org.apache.curator.x.async.modeled.typed.TypedModelSpec}, but with 2 parameters + */ +@FunctionalInterface +public interface TypedModelSpec2<M, P1, P2> +{ + ModelSpec<M> resolved(P1 p1, P2 p2); + + /** + * Return a new TypedModelSpec using the given model spec builder and typed path. When + * {@link #resolved(Object, Object)} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param path typed path + * @return new TypedModelSpec + */ + static <M, P1, P2> TypedModelSpec2<M, P1, P2> from(ModelSpecBuilder<M> builder, TypedZPath2<P1, P2> path) + { + return (p1, p2) -> builder.withPath(path.resolved(p1, p2)).build(); + } + + /** + * Return a new TypedModelSpec using the given model spec builder and path. A TypedZPath + * is created from the given full path and When + * {@link #resolved(Object, Object)} is called the actual model spec is generated with the + * resolved path + * + * @param builder model spec builder + * @param pathWithIds typed path + * @return new TypedModelSpec + */ + static <M, P1, P2> TypedModelSpec2<M, P1, P2> from(ModelSpecBuilder<M> builder, String pathWithIds) + { + TypedZPath2<P1, P2> zPath = TypedZPath2.from(pathWithIds); + return (p1, p2) -> builder.withPath(zPath.resolved(p1, p2)).build(); + } +}
