Repository: curator Updated Branches: refs/heads/CURATOR-397 fa6bff6e5 -> 11cb97036
more examples, added transactions Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/11cb9703 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/11cb9703 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/11cb9703 Branch: refs/heads/CURATOR-397 Commit: 11cb97036ebc8639bd22799d99569511deac098b Parents: fa6bff6 Author: randgalt <[email protected]> Authored: Sat Apr 29 13:52:36 2017 -0500 Committer: randgalt <[email protected]> Committed: Sat Apr 29 13:52:36 2017 -0500 ---------------------------------------------------------------------- .../src/main/java/modeled/ContainerType.java | 68 ++++++++++++++ .../java/modeled/ModeledCuratorExamples.java | 4 +- .../java/modeled/ModeledCuratorExamplesAlt.java | 29 ++---- .../src/main/java/modeled/PersonId.java | 70 +++++++++++++++ .../src/main/java/modeled/PersonModel.java | 42 ++++++--- .../src/main/java/modeled/PersonModelSpec.java | 46 ++++++++++ .../api/AsyncTransactionCreateBuilder.java | 8 +- .../x/async/details/AsyncTransactionOpImpl.java | 8 +- .../async/modeled/ModeledCuratorFramework.java | 78 +++++++++++++++- .../x/async/modeled/details/CachingImpl.java | 10 +-- .../details/ModeledCuratorFrameworkImpl.java | 95 ++++++++++++++++---- .../modeled/TestModeledCuratorFramework.java | 12 +-- .../TestCachedModeledCuratorFramework.java | 6 +- .../modeled/recipes/TestModeledCaches.java | 6 +- 14 files changed, 395 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-examples/src/main/java/modeled/ContainerType.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/ContainerType.java b/curator-examples/src/main/java/modeled/ContainerType.java new file mode 100644 index 0000000..a36cfaa --- /dev/null +++ b/curator-examples/src/main/java/modeled/ContainerType.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package modeled; + +public class ContainerType +{ + private final int typeId; + + public ContainerType() + { + this(0); + } + + public ContainerType(int typeId) + { + this.typeId = typeId; + } + + public int getTypeId() + { + return typeId; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + ContainerType that = (ContainerType)o; + + return typeId == that.typeId; + } + + @Override + public int hashCode() + { + return typeId; + } + + @Override + public String toString() + { + return "ContainerType{" + "typeId=" + typeId + '}'; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java b/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java index 6fc530f..aa37ab1 100644 --- a/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java +++ b/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java @@ -43,11 +43,11 @@ public class ModeledCuratorExamples public static void createOrUpdate(ModeledCuratorFramework<PersonModel> modeled, PersonModel model) { // change the affected path to be modeled's base path plus id: i.e. "/example/path/{id}" - ModeledCuratorFramework<PersonModel> atId = modeled.at(model.getId()); + ModeledCuratorFramework<PersonModel> atId = modeled.at(model.getId().getId()); // by default ModeledCuratorFramework instances update the node if it already exists // so this will either create or update the node - atId.create(model); // note - this is async + atId.set(model); // note - this is async } public static void readPerson(ModeledCuratorFramework<PersonModel> modeled, String id, Consumer<PersonModel> receiver) http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-examples/src/main/java/modeled/ModeledCuratorExamplesAlt.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/ModeledCuratorExamplesAlt.java b/curator-examples/src/main/java/modeled/ModeledCuratorExamplesAlt.java index 025eb96..a8d9bcf 100644 --- a/curator-examples/src/main/java/modeled/ModeledCuratorExamplesAlt.java +++ b/curator-examples/src/main/java/modeled/ModeledCuratorExamplesAlt.java @@ -18,43 +18,24 @@ */ package modeled; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.x.async.modeled.CuratorModelSpec; -import org.apache.curator.x.async.modeled.JacksonModelSerializer; import org.apache.curator.x.async.modeled.ModeledCuratorFramework; -import org.apache.curator.x.async.modeled.ZPath; import java.util.function.Consumer; public class ModeledCuratorExamplesAlt { - public static ModeledCuratorFramework<PersonModel> wrap(CuratorFramework client) - { - JacksonModelSerializer<PersonModel> serializer = JacksonModelSerializer.build(PersonModel.class); - - ZPath path = ZPath.from("example", ZPath.parameterNodeName(), "path", ZPath.parameterNodeName()); - - // build a model specification - you can pre-build all the model specifications for your app at startup - CuratorModelSpec<PersonModel> modelSpec = CuratorModelSpec.builder(path, serializer).build(); - - // wrap a CuratorFramework instance so that it can be used "modeled". - // do this once and re-use the returned ModeledCuratorFramework instance. - // ModeledCuratorFramework instances are tied to a given path - return ModeledCuratorFramework.wrap(client, modelSpec); - } - - public static void createOrUpdate(ModeledCuratorFramework<PersonModel> modeled, String typeId, PersonModel model) + public static void createOrUpdate(PersonModelSpec modelSpec, PersonModel model) { // change the affected path to be modeled's base path plus id: i.e. "/example/path/{id}" - ModeledCuratorFramework<PersonModel> resolved = modeled.resolved(typeId, model.getId()); + ModeledCuratorFramework<PersonModel> resolved = modelSpec.resolved(model.getContainerType(), model.getId()); // by default ModeledCuratorFramework instances update the node if it already exists // so this will either create or update the node - resolved.create(model); // note - this is async + resolved.set(model); // note - this is async } - public static void readPerson(ModeledCuratorFramework<PersonModel> modeled, String typeId, String id, Consumer<PersonModel> receiver) + public static void readPerson(PersonModelSpec modelSpec, ContainerType containerType, PersonId id, Consumer<PersonModel> receiver) { - ModeledCuratorFramework<PersonModel> resolved = modeled.resolved(typeId, id); + ModeledCuratorFramework<PersonModel> resolved = modelSpec.resolved(containerType, id); // read the person with the given ID and asynchronously call the receiver after it is read resolved.read().whenComplete((person, exception) -> { http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-examples/src/main/java/modeled/PersonId.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/PersonId.java b/curator-examples/src/main/java/modeled/PersonId.java new file mode 100644 index 0000000..eabc286 --- /dev/null +++ b/curator-examples/src/main/java/modeled/PersonId.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package modeled; + +import java.util.Objects; + +public class PersonId +{ + private final String id; + + public PersonId() + { + this(""); + } + + public PersonId(String id) + { + this.id = Objects.requireNonNull(id, "id cannot be null"); + } + + public String getId() + { + return id; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + PersonId personId = (PersonId)o; + + return id.equals(personId.id); + } + + @Override + public int hashCode() + { + return id.hashCode(); + } + + @Override + public String toString() + { + return "PersonId{" + "id='" + id + '\'' + '}'; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-examples/src/main/java/modeled/PersonModel.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/PersonModel.java b/curator-examples/src/main/java/modeled/PersonModel.java index 7bb5a44..f9b9102 100644 --- a/curator-examples/src/main/java/modeled/PersonModel.java +++ b/curator-examples/src/main/java/modeled/PersonModel.java @@ -18,31 +18,40 @@ */ package modeled; +import java.util.Objects; + public class PersonModel { - private final String id; + private final PersonId id; + private final ContainerType containerType; private final String firstName; private final String lastName; private final int age; public PersonModel() { - this(null, null, null, 0); + this(new PersonId(), new ContainerType(), null, null, 0); } - public PersonModel(String id, String firstName, String lastName, int age) + public PersonModel(PersonId id, ContainerType containerType, String firstName, String lastName, int age) { - this.id = id; - this.firstName = firstName; - this.lastName = lastName; + this.id = Objects.requireNonNull(id, "id cannot be null"); + this.containerType = Objects.requireNonNull(containerType, "containerType cannot be null"); + this.firstName = Objects.requireNonNull(firstName, "firstName cannot be null"); + this.lastName = Objects.requireNonNull(lastName, "lastName cannot be null"); this.age = age; } - public String getId() + public PersonId getId() { return id; } + public ContainerType getContainerType() + { + return containerType; + } + public String getFirstName() { return firstName; @@ -76,24 +85,29 @@ public class PersonModel { return false; } - if ( id != null ? !id.equals(that.id) : that.id != null ) + if ( !id.equals(that.id) ) + { + return false; + } + if ( !containerType.equals(that.containerType) ) { return false; } //noinspection SimplifiableIfStatement - if ( firstName != null ? !firstName.equals(that.firstName) : that.firstName != null ) + if ( !firstName.equals(that.firstName) ) { return false; } - return lastName != null ? lastName.equals(that.lastName) : that.lastName == null; + return lastName.equals(that.lastName); } @Override public int hashCode() { - int result = id != null ? id.hashCode() : 0; - result = 31 * result + (firstName != null ? firstName.hashCode() : 0); - result = 31 * result + (lastName != null ? lastName.hashCode() : 0); + int result = id.hashCode(); + result = 31 * result + containerType.hashCode(); + result = 31 * result + firstName.hashCode(); + result = 31 * result + lastName.hashCode(); result = 31 * result + age; return result; } @@ -101,6 +115,6 @@ public class PersonModel @Override public String toString() { - return "PersonModel{" + "id='" + id + '\'' + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + ", age=" + age + '}'; + return "PersonModel{" + "id=" + id + ", containerType=" + containerType + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + ", age=" + age + '}'; } } http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-examples/src/main/java/modeled/PersonModelSpec.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/PersonModelSpec.java b/curator-examples/src/main/java/modeled/PersonModelSpec.java new file mode 100644 index 0000000..3f0198d --- /dev/null +++ b/curator-examples/src/main/java/modeled/PersonModelSpec.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package modeled; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.modeled.CuratorModelSpec; +import org.apache.curator.x.async.modeled.JacksonModelSerializer; +import org.apache.curator.x.async.modeled.ModeledCuratorFramework; +import org.apache.curator.x.async.modeled.ZPath; + +public class PersonModelSpec +{ + private final CuratorFramework client; + private final CuratorModelSpec<PersonModel> modelSpec; + + public PersonModelSpec(CuratorFramework client) + { + this.client = client; + + JacksonModelSerializer<PersonModel> serializer = JacksonModelSerializer.build(PersonModel.class); + ZPath path = ZPath.from("example", ZPath.parameterNodeName(), "path", ZPath.parameterNodeName()); + modelSpec = CuratorModelSpec.builder(path, serializer).build(); + } + + public ModeledCuratorFramework<PersonModel> resolved(ContainerType containerType, PersonId personId) + { + CuratorModelSpec<PersonModel> resolved = modelSpec.resolved(containerType.getTypeId(), personId.getId()); + return ModeledCuratorFramework.wrap(client, resolved); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java index 439db81..81da5c0 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java @@ -34,7 +34,7 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable<Cur * @param createMode mode * @return this */ - AsyncPathable<CuratorOp> withMode(CreateMode createMode); + AsyncPathAndBytesable<CuratorOp> withMode(CreateMode createMode); /** * Set an ACL list (default is {@link org.apache.zookeeper.ZooDefs.Ids#OPEN_ACL_UNSAFE}) @@ -42,14 +42,14 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable<Cur * @param aclList the ACL list to use * @return this */ - AsyncPathable<CuratorOp> withACL(List<ACL> aclList); + AsyncPathAndBytesable<CuratorOp> withACL(List<ACL> aclList); /** * Cause the data to be compressed using the configured compression provider * * @return this */ - AsyncPathable<CuratorOp> compressed(); + AsyncPathAndBytesable<CuratorOp> compressed(); /** * Specify mode, acl list and compression @@ -62,5 +62,5 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable<Cur * @see #compressed() * @return this */ - AsyncPathable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed); + AsyncPathAndBytesable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed); } http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java index 89f0a22..0be720f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -56,28 +56,28 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp private boolean compressed = false; @Override - public AsyncPathable<CuratorOp> withMode(CreateMode createMode) + public AsyncPathAndBytesable<CuratorOp> withMode(CreateMode createMode) { this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); return this; } @Override - public AsyncPathable<CuratorOp> withACL(List<ACL> aclList) + public AsyncPathAndBytesable<CuratorOp> withACL(List<ACL> aclList) { this.aclList = aclList; return this; } @Override - public AsyncPathable<CuratorOp> compressed() + public AsyncPathAndBytesable<CuratorOp> compressed() { compressed = true; return this; } @Override - public AsyncPathable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed) + public AsyncPathAndBytesable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed) { this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); this.aclList = aclList; http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java index 5008b86..a53eb77 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledCuratorFramework.java @@ -19,6 +19,8 @@ package org.apache.curator.x.async.modeled; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.modeled.caching.Caching; import org.apache.zookeeper.data.Stat; @@ -85,7 +87,7 @@ public interface ModeledCuratorFramework<T> * @return AsyncStage * @see org.apache.curator.x.async.AsyncStage */ - AsyncStage<String> create(T model); + AsyncStage<String> set(T model); /** * Create (or update depending on build options) a ZNode at this instance's path with a serialized @@ -96,7 +98,7 @@ public interface ModeledCuratorFramework<T> * @return AsyncStage * @see org.apache.curator.x.async.AsyncStage */ - AsyncStage<String> create(T model, Stat storingStatIn); + AsyncStage<String> set(T model, Stat storingStatIn); /** * Read the ZNode at this instance's path and deserialize into a model @@ -168,4 +170,76 @@ public interface ModeledCuratorFramework<T> * @see org.apache.curator.x.async.AsyncStage */ AsyncStage<List<ZPath>> getChildren(); + + /** + * Create operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. Note: + * due to ZooKeeper transaction limits, this is a _not_ a "set or update" operation but only + * a create operation and will generate an error if the node already exists. + * + * @param model the model + * @return operation + */ + CuratorOp createOp(T model); + + /** + * Update operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. + * + * @param model the model + * @return operation + */ + CuratorOp updateOp(T model); + + /** + * Create operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. + * + * @param model the model + * @param version update version to use + * @return operation + */ + CuratorOp updateOp(T model, int version); + + /** + * Delete operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. + * + * @return operation + */ + CuratorOp deleteOp(); + + /** + * Delete operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. + * + * @param version delete version to use + * @return operation + */ + CuratorOp deleteOp(int version); + + /** + * Check exists operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. + * + * @return operation + */ + CuratorOp checkExistsOp(); + + /** + * Check exists operation instance that can be passed among other operations to + * {@link #inTransaction(java.util.List)} to be executed as a single transaction. + * + * @param version version to use + * @return operation + */ + CuratorOp checkExistsOp(int version); + + /** + * Invoke ZooKeeper to commit the given operations as a single transaction. + * + * @param operations operations that make up the transaction. + * @return AsyncStage instance for managing the completion + */ + AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations); } http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java index a77972b..01566d1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachingImpl.java @@ -89,14 +89,12 @@ class CachingImpl<T> implements Caching<T>, ModeledCacheListener<T> { case NODE_ADDED: case NODE_UPDATED: - { - updateDirtyZxid(event.getNode().getStat().getMzxid()); - break; - } - case NODE_REMOVED: { - // TODO + ModeledCachedNode<T> node = event.getNode(); + Stat stat = (node != null) ? node.getStat() : null; + long mzxid = (stat != null) ? stat.getMzxid() : -1; + updateDirtyZxid(mzxid); break; } http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java index ebe9d62..b58e1e5 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCuratorFrameworkImpl.java @@ -23,12 +23,15 @@ import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.api.AsyncCuratorFrameworkDsl; import org.apache.curator.x.async.api.AsyncPathAndBytesable; import org.apache.curator.x.async.api.AsyncPathable; +import org.apache.curator.x.async.api.AsyncTransactionSetDataBuilder; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework; import org.apache.curator.x.async.modeled.CuratorModelSpec; @@ -52,7 +55,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T { private final AsyncCuratorFramework client; private final WatchableAsyncCuratorFramework watchableClient; - private final CuratorModelSpec<T> model; + private final CuratorModelSpec<T> modelSpec; private final WatchMode watchMode; private final UnaryOperator<WatchedEvent> watcherFilter; private final UnhandledErrorListener unhandledErrorListener; @@ -88,12 +91,12 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T ); } - private ModeledCuratorFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, CuratorModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CachingImpl<T> caching) + private ModeledCuratorFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, CuratorModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, CachingImpl<T> caching) { this.client = client; this.dslClient = dslClient; this.watchableClient = watchableClient; - this.model = model; + this.modelSpec = modelSpec; this.watchMode = watchMode; this.watcherFilter = watcherFilter; this.unhandledErrorListener = unhandledErrorListener; @@ -115,17 +118,17 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } @Override - public AsyncStage<String> create(T item) + public AsyncStage<String> set(T item) { - return create(item, null); + return set(item, null); } @Override - public AsyncStage<String> create(T item, Stat storingStatIn) + public AsyncStage<String> set(T item, Stat storingStatIn) { long dirtyZxid = getDirtyZxid(); - byte[] bytes = model.serializer().serialize(item); - AsyncStage<String> asyncStage = dslClient.create().withOptions(model.createOptions(), model.createMode(), fixAclList(model.aclList()), storingStatIn).forPath(model.path().fullPath(), bytes); + byte[] bytes = modelSpec.serializer().serialize(item); + AsyncStage<String> asyncStage = dslClient.create().withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn).forPath(modelSpec.path().fullPath(), bytes); ModelStage<String> modelStage = new ModelStage<>(); markDirtyCompleter(dirtyZxid, asyncStage, modelStage); return modelStage; @@ -174,7 +177,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T { next = (storingStatIn != null) ? watchableClient.getData().storingStatIn(storingStatIn) : watchableClient.getData(); } - AsyncStage<byte[]> asyncStage = next.forPath(model.path().fullPath()); + AsyncStage<byte[]> asyncStage = next.forPath(modelSpec.path().fullPath()); ModelStage<T> modelStage = new ModelStage<>(asyncStage.event()); asyncStage.whenComplete((value, e) -> { if ( e != null ) @@ -185,7 +188,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T { try { - modelStage.complete(model.serializer().deserialize(value)); + modelStage.complete(modelSpec.serializer().deserialize(value)); } catch ( Exception deserializeException ) { @@ -206,9 +209,9 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T public AsyncStage<Stat> update(T item, int version) { long dirtyZxid = getDirtyZxid(); - byte[] bytes = model.serializer().serialize(item); + byte[] bytes = modelSpec.serializer().serialize(item); AsyncPathAndBytesable<AsyncStage<Stat>> next = isCompressed() ? dslClient.setData().compressedWithVersion(version) : dslClient.setData(); - AsyncStage<Stat> asyncStage = next.forPath(model.path().fullPath(), bytes); + AsyncStage<Stat> asyncStage = next.forPath(modelSpec.path().fullPath(), bytes); ModelStage<Stat> modelStage = new ModelStage<>(asyncStage.event()); markDirtyCompleter(dirtyZxid, asyncStage, modelStage); return modelStage; @@ -227,7 +230,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } return result; } - return watchableClient.checkExists().forPath(model.path().fullPath()); + return watchableClient.checkExists().forPath(modelSpec.path().fullPath()); } @Override @@ -240,7 +243,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T public AsyncStage<Void> delete(int version) { long dirtyZxid = getDirtyZxid(); - AsyncStage<Void> asyncStage = dslClient.delete().withVersion(-1).forPath(model.path().fullPath()); + AsyncStage<Void> asyncStage = dslClient.delete().withVersion(-1).forPath(modelSpec.path().fullPath()); ModelStage<Void> modelStage = new ModelStage<>(asyncStage.event()); markDirtyCompleter(dirtyZxid, asyncStage, modelStage); return modelStage; @@ -249,7 +252,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T @Override public AsyncStage<List<ZPath>> getChildren() { - AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(model.path().fullPath()); + AsyncStage<List<String>> asyncStage = watchableClient.getChildren().forPath(modelSpec.path().fullPath()); ModelStage<List<ZPath>> modelStage = new ModelStage<>(asyncStage.event()); asyncStage.whenComplete((children, e) -> { if ( e != null ) @@ -258,7 +261,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T } else { - modelStage.complete(children.stream().map(child -> model.path().at(child)).collect(Collectors.toList())); + modelStage.complete(children.stream().map(child -> modelSpec.path().at(child)).collect(Collectors.toList())); } }); return modelStage; @@ -267,7 +270,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T @Override public ModeledCuratorFramework<T> at(String child) { - CuratorModelSpec<T> childModel = model.at(child); + CuratorModelSpec<T> childModel = modelSpec.at(child); CachingImpl<T> newCaching = (caching != null) ? caching.at(child) : null; return new ModeledCuratorFrameworkImpl<>( client, @@ -287,6 +290,62 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T return createOptions.contains(CreateOption.compress); } + @Override + public CuratorOp createOp(T model) + { + return client.transactionOp() + .create() + .withOptions(modelSpec.createMode(), modelSpec.aclList(), modelSpec.createOptions().contains(CreateOption.compress)) + .forPath(modelSpec.path().fullPath(), modelSpec.serializer().serialize(model)); + } + + @Override + public CuratorOp updateOp(T model) + { + return updateOp(model, -1); + } + + @Override + public CuratorOp updateOp(T model, int version) + { + AsyncTransactionSetDataBuilder builder = client.transactionOp().setData(); + if ( isCompressed() ) + { + return builder.withVersionCompressed(version).forPath(modelSpec.path().fullPath(), modelSpec.serializer().serialize(model)); + } + return builder.withVersion(version).forPath(modelSpec.path().fullPath(), modelSpec.serializer().serialize(model)); + } + + @Override + public CuratorOp deleteOp() + { + return deleteOp(-1); + } + + @Override + public CuratorOp deleteOp(int version) + { + return client.transactionOp().delete().withVersion(version).forPath(modelSpec.path().fullPath()); + } + + @Override + public CuratorOp checkExistsOp() + { + return checkExistsOp(-1); + } + + @Override + public CuratorOp checkExistsOp(int version) + { + return client.transactionOp().check().withVersion(version).forPath(modelSpec.path().fullPath()); + } + + @Override + public AsyncStage<List<CuratorTransactionResult>> inTransaction(List<CuratorOp> operations) + { + return client.transaction().forOperations(operations); + } + private <U> void markDirtyCompleter(long dirtyZxid, AsyncStage<U> asyncStage, ModelStage<U> modelStage) { asyncStage.whenComplete((value, e) -> { @@ -307,7 +366,7 @@ public class ModeledCuratorFrameworkImpl<T> implements ModeledCuratorFramework<T private boolean isCompressed() { - return model.createOptions().contains(CreateOption.compress); + return modelSpec.createOptions().contains(CreateOption.compress); } private ModeledCachedNode<T> getCached() http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledCuratorFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledCuratorFramework.java index da63d37..3d98c1a 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledCuratorFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledCuratorFramework.java @@ -74,7 +74,7 @@ public class TestModeledCuratorFramework extends CompletableBaseClassForTests TestModel rawModel = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); TestModel rawModel2 = new TestModel("Wayne", "Rooney", "Old Trafford", 10, BigInteger.valueOf(1)); ModeledCuratorFramework<TestModel> client = ModeledCuratorFramework.wrap(rawClient, modelSpec); - AsyncStage<String> stage = client.create(rawModel); + AsyncStage<String> stage = client.set(rawModel); Assert.assertNull(stage.event()); complete(stage, (s, e) -> Assert.assertNotNull(s)); complete(client.read(), (model, e) -> Assert.assertEquals(model, rawModel)); @@ -89,7 +89,7 @@ public class TestModeledCuratorFramework extends CompletableBaseClassForTests { TestNewerModel rawNewModel = new TestNewerModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1), 100); ModeledCuratorFramework<TestNewerModel> clientForNew = ModeledCuratorFramework.wrap(rawClient, newModelSpec); - complete(clientForNew.create(rawNewModel), (s, e) -> Assert.assertNotNull(s)); + complete(clientForNew.set(rawNewModel), (s, e) -> Assert.assertNotNull(s)); ModeledCuratorFramework<TestModel> clientForOld = ModeledCuratorFramework.wrap(rawClient, modelSpec); complete(clientForOld.read(), (model, e) -> Assert.assertTrue(rawNewModel.equalsOld(model))); @@ -103,7 +103,7 @@ public class TestModeledCuratorFramework extends CompletableBaseClassForTests client.checkExists().event().whenComplete((event, ex) -> latch.countDown()); timing.sleepABit(); Assert.assertEquals(latch.getCount(), 1); - client.create(new TestModel()); + client.set(new TestModel()); Assert.assertTrue(timing.awaitLatch(latch)); } @@ -112,9 +112,9 @@ public class TestModeledCuratorFramework extends CompletableBaseClassForTests { TestModel model = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); ModeledCuratorFramework<TestModel> client = ModeledCuratorFramework.builder(rawClient, modelSpec).build(); - complete(client.at("one").create(model)); - complete(client.at("two").create(model)); - complete(client.at("three").create(model)); + complete(client.at("one").set(model)); + complete(client.at("two").set(model)); + complete(client.at("three").set(model)); Set<ZPath> expected = Sets.newHashSet(path.at("one"), path.at("two"), path.at("three")); complete(client.getChildren(), (children, e) -> Assert.assertEquals(Sets.newHashSet(children), expected)); http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java index 06d6834..4954ae2 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledCuratorFramework.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.async.modeled.details; -import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; @@ -29,7 +28,6 @@ import org.apache.curator.x.async.modeled.JacksonModelSerializer; import org.apache.curator.x.async.modeled.ModelSerializer; import org.apache.curator.x.async.modeled.ModeledCuratorFramework; import org.apache.curator.x.async.modeled.ZPath; -import org.apache.curator.x.async.modeled.caching.CachingOption; import org.apache.curator.x.async.modeled.models.TestSimpleModel; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -75,7 +73,7 @@ public class TestCachedModeledCuratorFramework extends CompletableBaseClassForTe complete(client.read()); Assert.assertEquals(counter.get(), 0); - complete(client.create(new TestSimpleModel("test", 10))); + complete(client.set(new TestSimpleModel("test", 10))); Assert.assertEquals(counter.get(), 0); timing.sleepABit(); @@ -84,7 +82,7 @@ public class TestCachedModeledCuratorFramework extends CompletableBaseClassForTe Assert.assertEquals(counter.get(), 1); counter.set(0); - complete(client.create(new TestSimpleModel("test2", 20))); + complete(client.set(new TestSimpleModel("test2", 20))); Assert.assertEquals(counter.get(), 0); timing.sleepABit(); http://git-wip-us.apache.org/repos/asf/curator/blob/11cb9703/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java index aacc4a9..0a32e9a 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/recipes/TestModeledCaches.java @@ -87,7 +87,7 @@ public class TestModeledCaches extends CompletableBaseClassForTests TestModel model2 = new TestModel("d", "e", "f", 10, BigInteger.ONE); Stat stat = new Stat(); - modeled.create(model1, stat); + modeled.set(model1, stat); ModeledCacheEvent<TestModel> event = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); Assert.assertNotNull(event); Assert.assertEquals(event.getType(), ModeledCacheEventType.NODE_UPDATED); @@ -126,7 +126,7 @@ public class TestModeledCaches extends CompletableBaseClassForTests TestModel model2 = new TestModel("d", "e", "f", 10, BigInteger.ONE); TestModel model3 = new TestModel("g", "h", "i", 100, BigInteger.ZERO); - modeled.at("1").create(model1).thenApply(__ -> modeled.at("2").create(model2)); + modeled.at("1").set(model1).thenApply(__ -> modeled.at("2").set(model2)); ModeledCacheEvent<TestModel> event1 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); ModeledCacheEvent<TestModel> event2 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); Assert.assertNotNull(event1); @@ -172,7 +172,7 @@ public class TestModeledCaches extends CompletableBaseClassForTests TestModel model2 = new TestModel("d", "e", "f", 10, BigInteger.ONE); TestModel model3 = new TestModel("g", "h", "i", 100, BigInteger.ZERO); - modeled.at("1").create(model1).thenApply(__ -> modeled.at("1").at("2").create(model2).thenApply(___ -> modeled.at("1").at("2").at("3").create(model3))); + modeled.at("1").set(model1).thenApply(__ -> modeled.at("1").at("2").set(model2).thenApply(___ -> modeled.at("1").at("2").at("3").set(model3))); ModeledCacheEvent<TestModel> event1 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); ModeledCacheEvent<TestModel> event2 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); ModeledCacheEvent<TestModel> event3 = events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
