Repository: curator Updated Branches: refs/heads/CURATOR-397 1fcb63a5c -> 713bf4670
Work-in-progress. Using example sub/pub to flesh out issues and as an integration test. Working on bugs found Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/713bf467 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/713bf467 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/713bf467 Branch: refs/heads/CURATOR-397 Commit: 713bf4670e494a74b083431929b86dbfe6bd7f75 Parents: 1fcb63a Author: randgalt <[email protected]> Authored: Tue May 2 20:45:23 2017 -0500 Committer: randgalt <[email protected]> Committed: Tue May 2 20:45:23 2017 -0500 ---------------------------------------------------------------------- curator-examples/pom.xml | 1 - .../java/modeled/ModeledCuratorExamples.java | 6 +- .../src/main/java/modeled/PersonModelSpec.java | 8 +- .../main/java/pubsub/InstanceSubscriber.java | 36 ---- .../src/main/java/pubsub/MessageSubscriber.java | 37 ---- .../src/main/java/pubsub/Publisher.java | 48 +++-- .../src/main/java/pubsub/SubPubTest.java | 180 ++++++++++++++++++- .../src/main/java/pubsub/Subscriber.java | 39 +++- .../src/main/java/pubsub/builders/Clients.java | 12 +- .../main/java/pubsub/builders/ModelSpecs.java | 27 ++- .../src/main/java/pubsub/builders/Paths.java | 5 +- .../java/pubsub/messages/LocationAvailable.java | 11 ++ .../main/java/pubsub/messages/UserCreated.java | 11 ++ .../src/main/java/pubsub/models/Instance.java | 6 + .../src/main/java/pubsub/models/Message.java | 11 ++ .../src/main/resources/log4j.properties | 24 +++ .../api/AsyncTransactionCreateBuilder.java | 14 ++ .../x/async/details/AsyncTransactionOpImpl.java | 13 +- .../curator/x/async/modeled/ModelSpec.java | 13 +- .../x/async/modeled/ModelSpecBuilder.java | 19 +- .../x/async/modeled/ModeledFramework.java | 18 +- .../async/modeled/ModeledFrameworkBuilder.java | 8 +- .../details/CachedModeledFrameworkImpl.java | 8 +- .../x/async/modeled/details/ModelSpecImpl.java | 35 ++-- .../async/modeled/details/ModeledCacheImpl.java | 18 +- .../modeled/details/ModeledFrameworkImpl.java | 16 +- .../x/async/modeled/details/ZPathImpl.java | 8 +- .../modeled/typed/TypedModeledFramework.java | 6 +- .../modeled/typed/TypedModeledFramework10.java | 6 +- .../modeled/typed/TypedModeledFramework2.java | 6 +- .../modeled/typed/TypedModeledFramework3.java | 6 +- .../modeled/typed/TypedModeledFramework4.java | 6 +- .../modeled/typed/TypedModeledFramework5.java | 6 +- .../modeled/typed/TypedModeledFramework6.java | 6 +- .../modeled/typed/TypedModeledFramework7.java | 6 +- .../modeled/typed/TypedModeledFramework8.java | 6 +- .../modeled/typed/TypedModeledFramework9.java | 6 +- .../x/async/modeled/TestModeledFramework.java | 11 +- .../details/TestCachedModeledFramework.java | 3 +- 39 files changed, 508 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/pom.xml ---------------------------------------------------------------------- diff --git a/curator-examples/pom.xml b/curator-examples/pom.xml index 6d8be77..cc65570 100644 --- a/curator-examples/pom.xml +++ b/curator-examples/pom.xml @@ -62,7 +62,6 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java b/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java index 7bfc131..2259b0f 100644 --- a/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java +++ b/curator-examples/src/main/java/modeled/ModeledCuratorExamples.java @@ -18,16 +18,16 @@ */ package modeled; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.JacksonModelSerializer; +import org.apache.curator.x.async.modeled.ModelSpec; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ZPath; import java.util.function.Consumer; public class ModeledCuratorExamples { - public static ModeledFramework<PersonModel> wrap(CuratorFramework client) + public static ModeledFramework<PersonModel> wrap(AsyncCuratorFramework client) { JacksonModelSerializer<PersonModel> serializer = JacksonModelSerializer.build(PersonModel.class); http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/modeled/PersonModelSpec.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/modeled/PersonModelSpec.java b/curator-examples/src/main/java/modeled/PersonModelSpec.java index 8230f6e..97bbe48 100644 --- a/curator-examples/src/main/java/modeled/PersonModelSpec.java +++ b/curator-examples/src/main/java/modeled/PersonModelSpec.java @@ -18,18 +18,18 @@ */ package modeled; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.JacksonModelSerializer; +import org.apache.curator.x.async.modeled.ModelSpec; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ZPath; public class PersonModelSpec { - private final CuratorFramework client; + private final AsyncCuratorFramework client; private final ModelSpec<PersonModel> modelSpec; - public PersonModelSpec(CuratorFramework client) + public PersonModelSpec(AsyncCuratorFramework client) { this.client = client; http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/InstanceSubscriber.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/InstanceSubscriber.java b/curator-examples/src/main/java/pubsub/InstanceSubscriber.java deleted file mode 100644 index d820014..0000000 --- a/curator-examples/src/main/java/pubsub/InstanceSubscriber.java +++ /dev/null @@ -1,36 +0,0 @@ -package pubsub; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; -import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import pubsub.builders.Clients; -import pubsub.models.Instance; -import pubsub.models.InstanceType; -import java.io.Closeable; - -public class InstanceSubscriber implements Closeable -{ - private final CachedModeledFramework<Instance> client; - - public InstanceSubscriber(CuratorFramework client, InstanceType instanceType) - { - this.client = Clients.instanceClient.resolved(client, instanceType).cached(); - } - - public Listenable<ModeledCacheListener<Instance>> listenable() - { - return client.getCache().listenable(); - } - - public void start() - { - client.start(); - } - - @Override - public void close() - { - client.close(); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/MessageSubscriber.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/MessageSubscriber.java b/curator-examples/src/main/java/pubsub/MessageSubscriber.java deleted file mode 100644 index 64ba867..0000000 --- a/curator-examples/src/main/java/pubsub/MessageSubscriber.java +++ /dev/null @@ -1,37 +0,0 @@ -package pubsub; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; -import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; -import pubsub.builders.Clients; -import pubsub.models.Group; -import pubsub.models.Message; -import pubsub.models.Priority; -import java.io.Closeable; - -public class MessageSubscriber implements Closeable -{ - private final CachedModeledFramework<Message> client; - - public MessageSubscriber(CuratorFramework client, Group group, Priority priority) - { - this.client = Clients.messageClient.resolved(client, group, priority).cached(); - } - - public Listenable<ModeledCacheListener<Message>> listenable() - { - return client.getCache().listenable(); - } - - public void start() - { - client.start(); - } - - @Override - public void close() - { - client.close(); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/Publisher.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/Publisher.java b/curator-examples/src/main/java/pubsub/Publisher.java index b86d2a2..53fad3e 100644 --- a/curator-examples/src/main/java/pubsub/Publisher.java +++ b/curator-examples/src/main/java/pubsub/Publisher.java @@ -1,35 +1,37 @@ package pubsub; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; import pubsub.models.Group; import pubsub.models.Instance; import pubsub.models.Message; +import pubsub.models.Priority; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -import static pubsub.builders.Clients.instanceClient; -import static pubsub.builders.Clients.messageClient; +import static pubsub.builders.Clients.*; public class Publisher { private final Logger log = LoggerFactory.getLogger(getClass()); private final AsyncCuratorFramework client; - public Publisher(CuratorFramework client) + public Publisher(AsyncCuratorFramework client) { - this.client = AsyncCuratorFramework.wrap(Objects.requireNonNull(client, "client cannot be null")); + this.client = Objects.requireNonNull(client, "client cannot be null"); } public void publishInstance(Instance instance) { ModeledFramework<Instance> resolvedClient = instanceClient - .resolved(client.unwrap(), instance.getType()) + .resolved(client, instance.getType()) .resolved(instance); resolvedClient.set(instance).exceptionally(e -> { log.error("Could not publish instance: " + instance, e); @@ -41,7 +43,7 @@ public class Publisher { List<CuratorOp> operations = instances.stream() .map(instance -> instanceClient - .resolved(client.unwrap(), instance.getType()) + .resolved(client, instance.getType()) .resolved(instance) .createOp(instance) ) @@ -52,10 +54,30 @@ public class Publisher }); } - public void publishMessage(Message message, Group group) + public void publishLocationAvailable(Group group, LocationAvailable message) { - ModeledFramework<Message> resolvedClient = messageClient - .resolved(client.unwrap(), group, message.getPriority()) + publishMessage(locationAvailableClient, group, message); + } + + public void publishUserCreated(Group group, UserCreated message) + { + publishMessage(userCreatedClient, group, message); + } + + public void publishLocationsAvailable(Group group, List<LocationAvailable> messages) + { + publishMessages(locationAvailableClient, group, messages); + } + + public void publishUsersCreated(Group group, List<UserCreated> messages) + { + publishMessages(userCreatedClient, group, messages); + } + + public <T extends Message> void publishMessage(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, T message) + { + ModeledFramework<T> resolvedClient = typedClient + .resolved(client, group, message.getPriority()) .resolved(message); resolvedClient.set(message).exceptionally(e -> { log.error("Could not publish message: " + message, e); @@ -63,11 +85,11 @@ public class Publisher }); } - public void publishMessages(List<Message> messages, Group group) + public <T extends Message> void publishMessages(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, List<T> messages) { List<CuratorOp> operations = messages.stream() - .map(message -> messageClient - .resolved(client.unwrap(), group, message.getPriority()) + .map(message -> typedClient + .resolved(client, group, message.getPriority()) .resolved(message) .createOp(message) ) http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/SubPubTest.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/SubPubTest.java b/curator-examples/src/main/java/pubsub/SubPubTest.java index 0ba638f..48d9d0b 100644 --- a/curator-examples/src/main/java/pubsub/SubPubTest.java +++ b/curator-examples/src/main/java/pubsub/SubPubTest.java @@ -1,18 +1,192 @@ package pubsub; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; +import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; +import pubsub.models.Group; +import pubsub.models.Instance; +import pubsub.models.InstanceType; +import pubsub.models.Priority; +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; -public class SubPubTest +public class SubPubTest implements Closeable { private final TestingServer testingServer; + private final AsyncCuratorFramework client; + private final Publisher publisher; + private final ScheduledExecutorService executorService; + private final List<CachedModeledFramework<Instance>> instanceSubscribers = new ArrayList<>(); + private final List<CachedModeledFramework<LocationAvailable>> locationAvailableSubscribers = new ArrayList<>(); + private final List<CachedModeledFramework<UserCreated>> userCreatedSubscribers = new ArrayList<>(); - public static void main(String[] args) - { + private static final AtomicLong id = new AtomicLong(1); + + private static final Group[] groups = {new Group("main"), new Group("admin")}; + private static final String[] hostnames = {"host1", "host2", "host3"}; + private static final Integer[] ports = {80, 443, 9999}; + private static final String[] locations = {"dc1", "dc2", "eu", "us"}; + private static final Duration[] durations = {Duration.ofSeconds(1), Duration.ofMinutes(1), Duration.ofHours(1)}; + private static final String[] positions = {"worker", "manager", "executive"}; + public static void main(String[] args) throws Exception + { + try ( SubPubTest subPubTest = new SubPubTest() ) + { + subPubTest.start(); + TimeUnit.MINUTES.sleep(1); + } } public SubPubTest() throws Exception { this.testingServer = new TestingServer(); + client = AsyncCuratorFramework.wrap(CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new RetryOneTime(1))); + publisher = new Publisher(client); + executorService = Executors.newSingleThreadScheduledExecutor(); + } + + public void start() + { + client.unwrap().start(); + + Subscriber subscriber = new Subscriber(client); + instanceSubscribers.addAll( + Arrays.stream(InstanceType.values()) + .map(subscriber::startInstanceSubscriber) + .collect(Collectors.toList()) + ); + + locationAvailableSubscribers.addAll( + Arrays.stream(Priority.values()) + .flatMap(priority -> Arrays.stream(groups).map(group -> subscriber.startLocationAvailableSubscriber(group, priority))) + .collect(Collectors.toList()) + ); + + userCreatedSubscribers.addAll( + Arrays.stream(Priority.values()) + .flatMap(priority -> Arrays.stream(groups).map(group -> subscriber.startUserCreatedSubscriber(group, priority))) + .collect(Collectors.toList()) + ); + + instanceSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener())); + locationAvailableSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener())); + userCreatedSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener())); + + executorService.scheduleAtFixedRate(this::publishSomething, 1, 1, TimeUnit.SECONDS); + } + + @Override + public void close() throws IOException + { + executorService.shutdownNow(); + try + { + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + catch ( InterruptedException ignore ) + { + Thread.currentThread().interrupt(); + } + + userCreatedSubscribers.forEach(CachedModeledFramework::close); + locationAvailableSubscribers.forEach(CachedModeledFramework::close); + instanceSubscribers.forEach(CachedModeledFramework::close); + client.unwrap().close(); + testingServer.close(); + } + + private void publishSomething() + { + switch ( ThreadLocalRandom.current().nextInt(5) ) + { + case 0: + { + Instance instance = new Instance(nextId(), random(InstanceType.values()), random(hostnames), random(ports)); + System.out.println("Publishing instance"); + publisher.publishInstance(instance); + break; + } + + case 1: + { + List<Instance> instances = IntStream.range(1, 10) + .mapToObj(__ -> new Instance(nextId(), random(InstanceType.values()), random(hostnames), random(ports))) + .collect(Collectors.toList()); + System.out.println("Publishing instances"); + publisher.publishInstances(instances); + break; + } + + case 2: + { + LocationAvailable locationAvailable = new LocationAvailable(nextId(), random(Priority.values()), random(locations), random(durations)); + System.out.println("Publishing locationAvailable"); + publisher.publishLocationAvailable(random(groups), locationAvailable); + break; + } + + case 3: + { + List<LocationAvailable> locationsAvailable = IntStream.range(1, 10) + .mapToObj(__ -> new LocationAvailable(nextId(), random(Priority.values()), random(locations), random(durations))) + .collect(Collectors.toList()); + System.out.println("Publishing locationsAvailable"); + publisher.publishLocationsAvailable(random(groups), locationsAvailable); + break; + } + + case 4: + { + UserCreated userCreated = new UserCreated(nextId(), random(Priority.values()), random(locations), random(positions)); + System.out.println("Publishing userCreated"); + publisher.publishUserCreated(random(groups), userCreated); + break; + } + + case 5: + { + List<UserCreated> usersCreated = IntStream.range(1, 10) + .mapToObj(__ -> new UserCreated(nextId(), random(Priority.values()), random(locations), random(positions))) + .collect(Collectors.toList()); + System.out.println("Publishing usersCreated"); + publisher.publishUsersCreated(random(groups), usersCreated); + break; + } + } + } + + private <T> ModeledCacheListener<T> generalListener() + { + return (type, path, stat, model) -> System.out.println(String.format("Subscribed %s @ %s", model.getClass().getSimpleName(), path)); + } + + @SafeVarargs + private final <T> T random(T... tab) + { + int index = ThreadLocalRandom.current().nextInt(tab.length); + return tab[index]; + } + + private String nextId() + { + return Long.toString(id.getAndIncrement()); } } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/Subscriber.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/Subscriber.java b/curator-examples/src/main/java/pubsub/Subscriber.java index ea6de01..29911a2 100644 --- a/curator-examples/src/main/java/pubsub/Subscriber.java +++ b/curator-examples/src/main/java/pubsub/Subscriber.java @@ -1,15 +1,48 @@ package pubsub; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; +import pubsub.models.Group; +import pubsub.models.Instance; +import pubsub.models.InstanceType; +import pubsub.models.Message; +import pubsub.models.Priority; + +import static pubsub.builders.Clients.*; public class Subscriber { - private final CuratorFramework client; + private final AsyncCuratorFramework client; - public Subscriber(CuratorFramework client) + public Subscriber(AsyncCuratorFramework client) { this.client = client; } + + public CachedModeledFramework<LocationAvailable> startLocationAvailableSubscriber(Group group, Priority priority) + { + return startSubscriber(locationAvailableClient, group, priority); + } + + public CachedModeledFramework<UserCreated> startUserCreatedSubscriber(Group group, Priority priority) + { + return startSubscriber(userCreatedClient, group, priority); + } + + public CachedModeledFramework<Instance> startInstanceSubscriber(InstanceType instanceType) + { + CachedModeledFramework<Instance> resolved = instanceClient.resolved(client, instanceType).cached(); + resolved.start(); + return resolved; + } + + public <T extends Message> CachedModeledFramework<T> startSubscriber(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, Priority priority) + { + CachedModeledFramework<T> resolved = typedClient.resolved(client, group, priority).cached(); + resolved.start(); + return resolved; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/builders/Clients.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/builders/Clients.java b/curator-examples/src/main/java/pubsub/builders/Clients.java index e2b1c89..f8569aa 100644 --- a/curator-examples/src/main/java/pubsub/builders/Clients.java +++ b/curator-examples/src/main/java/pubsub/builders/Clients.java @@ -3,17 +3,23 @@ package pubsub.builders; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.typed.TypedModeledFramework; import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; +import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; import pubsub.models.Group; import pubsub.models.Instance; import pubsub.models.InstanceType; -import pubsub.models.Message; import pubsub.models.Priority; public class Clients { - public static final TypedModeledFramework2<Message, Group, Priority> messageClient = TypedModeledFramework2.from( + public static final TypedModeledFramework2<LocationAvailable, Group, Priority> locationAvailableClient = TypedModeledFramework2.from( ModeledFramework.builder(), - ModelSpecs.messageModelSpec + ModelSpecs.locationAvailableModelSpec + ); + + public static final TypedModeledFramework2<UserCreated, Group, Priority> userCreatedClient = TypedModeledFramework2.from( + ModeledFramework.builder(), + ModelSpecs.userCreatedModelSpec ); public static final TypedModeledFramework<Instance, InstanceType> instanceClient = TypedModeledFramework.from( http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/builders/ModelSpecs.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/builders/ModelSpecs.java b/curator-examples/src/main/java/pubsub/builders/ModelSpecs.java index 83cdfe4..dff4e21 100644 --- a/curator-examples/src/main/java/pubsub/builders/ModelSpecs.java +++ b/curator-examples/src/main/java/pubsub/builders/ModelSpecs.java @@ -2,26 +2,43 @@ package pubsub.builders; import org.apache.curator.x.async.modeled.JacksonModelSerializer; import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModelSpecBuilder; import org.apache.curator.x.async.modeled.typed.TypedModelSpec; import org.apache.curator.x.async.modeled.typed.TypedModelSpec2; +import org.apache.zookeeper.CreateMode; +import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; import pubsub.models.Group; import pubsub.models.Instance; import pubsub.models.InstanceType; -import pubsub.models.Message; import pubsub.models.Priority; +import java.util.concurrent.TimeUnit; public class ModelSpecs { - public static final TypedModelSpec2<Message, Group, Priority> messageModelSpec = TypedModelSpec2.from( - ModelSpec.builder(JacksonModelSerializer.build(Message.class)), - Paths.messagesPath + public static final TypedModelSpec2<LocationAvailable, Group, Priority> locationAvailableModelSpec = TypedModelSpec2.from( + builder(LocationAvailable.class), + Paths.locationAvailablePath + ); + + public static final TypedModelSpec2<UserCreated, Group, Priority> userCreatedModelSpec = TypedModelSpec2.from( + builder(UserCreated.class), + Paths.userCreatedPath ); public static final TypedModelSpec<Instance, InstanceType> instanceModelSpec = TypedModelSpec.from( - ModelSpec.builder(JacksonModelSerializer.build(Instance.class)), + builder(Instance.class), Paths.instancesPath ); + private static <T> ModelSpecBuilder<T> builder(Class<T> clazz) + { + return ModelSpec.builder(JacksonModelSerializer.build(clazz)) + .withTtl(TimeUnit.MINUTES.toMillis(10)) + .withCreateMode(CreateMode.PERSISTENT_WITH_TTL) + ; + } + private ModelSpecs() { } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/builders/Paths.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/builders/Paths.java b/curator-examples/src/main/java/pubsub/builders/Paths.java index 0db5138..6b2dd6f 100644 --- a/curator-examples/src/main/java/pubsub/builders/Paths.java +++ b/curator-examples/src/main/java/pubsub/builders/Paths.java @@ -10,7 +10,10 @@ public class Paths { private static final String basePath = "/root/pubsub"; - public static final TypedZPath2<Group, Priority> messagesPath = TypedZPath2.from(basePath + "/messages/{id}/{id}"); + public static final TypedZPath2<Group, Priority> locationAvailablePath = TypedZPath2.from(basePath + "/messages/locations/{id}/{id}"); + + public static final TypedZPath2<Group, Priority> userCreatedPath = TypedZPath2.from(basePath + "/messages/users/{id}/{id}"); + public static final TypedZPath<InstanceType> instancesPath = TypedZPath.from(basePath + "/instances/{id}"); private Paths() http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/messages/LocationAvailable.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/messages/LocationAvailable.java b/curator-examples/src/main/java/pubsub/messages/LocationAvailable.java index 503e4ed..fbfd64e 100644 --- a/curator-examples/src/main/java/pubsub/messages/LocationAvailable.java +++ b/curator-examples/src/main/java/pubsub/messages/LocationAvailable.java @@ -10,6 +10,11 @@ public class LocationAvailable extends Message private final String name; private final Duration availableUntil; + public LocationAvailable() + { + this(Priority.low, "", Duration.ZERO); + } + public LocationAvailable(Priority priority, String name, Duration availableUntil) { super(priority); @@ -23,4 +28,10 @@ public class LocationAvailable extends Message this.name = Objects.requireNonNull(name, "name cannot be null"); this.availableUntil = Objects.requireNonNull(availableUntil, "availableUntil cannot be null"); } + + @Override + public String toString() + { + return "LocationAvailable{" + "name='" + name + '\'' + ", availableUntil=" + availableUntil + "} " + super.toString(); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/messages/UserCreated.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/messages/UserCreated.java b/curator-examples/src/main/java/pubsub/messages/UserCreated.java index da7e639..10ed48e 100644 --- a/curator-examples/src/main/java/pubsub/messages/UserCreated.java +++ b/curator-examples/src/main/java/pubsub/messages/UserCreated.java @@ -9,6 +9,11 @@ public class UserCreated extends Message private final String name; private final String position; + public UserCreated() + { + this(Priority.low, "",""); + } + public UserCreated(Priority priority, String name, String position) { super(priority); @@ -32,4 +37,10 @@ public class UserCreated extends Message { return position; } + + @Override + public String toString() + { + return "UserCreated{" + "name='" + name + '\'' + ", position='" + position + '\'' + "} " + super.toString(); + } } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/models/Instance.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/models/Instance.java b/curator-examples/src/main/java/pubsub/models/Instance.java index d674317..4fa62bd 100644 --- a/curator-examples/src/main/java/pubsub/models/Instance.java +++ b/curator-examples/src/main/java/pubsub/models/Instance.java @@ -49,4 +49,10 @@ public class Instance implements NodeName { return id; } + + @Override + public String toString() + { + return "Instance{" + "id='" + id + '\'' + ", type=" + type + ", hostname='" + hostname + '\'' + ", port=" + port + '}'; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/java/pubsub/models/Message.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/models/Message.java b/curator-examples/src/main/java/pubsub/models/Message.java index 27a0268..4151684 100644 --- a/curator-examples/src/main/java/pubsub/models/Message.java +++ b/curator-examples/src/main/java/pubsub/models/Message.java @@ -9,6 +9,11 @@ public abstract class Message implements NodeName private final String id; private final Priority priority; + protected Message() + { + this(UUID.randomUUID().toString(), Priority.low); + } + protected Message(Priority priority) { this(UUID.randomUUID().toString(), priority); @@ -35,4 +40,10 @@ public abstract class Message implements NodeName { return id; } + + @Override + public String toString() + { + return "Message{" + "id='" + id + '\'' + ", priority=" + priority + '}'; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-examples/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/resources/log4j.properties b/curator-examples/src/main/resources/log4j.properties new file mode 100644 index 0000000..0405670 --- /dev/null +++ b/curator-examples/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=ERROR, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java index 81da5c0..e2eabac 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java @@ -63,4 +63,18 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable<Cur * @return this */ AsyncPathAndBytesable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed); + + /** + * Specify mode, acl list and compression + * + * @param createMode mode + * @param aclList the ACL list to use + * @param compressed true to compress + * @param ttl node TTL or -1 + * @see #withMode(org.apache.zookeeper.CreateMode) + * @see #withACL(java.util.List) + * @see #compressed() + * @return this + */ + AsyncPathAndBytesable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed, long ttl); } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java index 0be720f..0280a7e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -54,6 +54,7 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp private List<ACL> aclList = null; private CreateMode createMode = CreateMode.PERSISTENT; private boolean compressed = false; + private long ttl = -1; @Override public AsyncPathAndBytesable<CuratorOp> withMode(CreateMode createMode) @@ -86,6 +87,16 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp } @Override + public AsyncPathAndBytesable<CuratorOp> withOptions(CreateMode createMode, List<ACL> aclList, boolean compressed, long ttl) + { + this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); + this.aclList = aclList; + this.compressed = compressed; + this.ttl = ttl; + return this; + } + + @Override public CuratorOp forPath(String path, byte[] data) { return internalForPath(path, data, true); @@ -99,7 +110,7 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp private CuratorOp internalForPath(String path, byte[] data, boolean useData) { - TransactionCreateBuilder<CuratorOp> builder1 = client.transactionOp().create(); + TransactionCreateBuilder<CuratorOp> builder1 = client.transactionOp().create(); // TODO - handle TTL ACLCreateModePathAndBytesable<CuratorOp> builder2 = compressed ? builder1.compressed() : builder1; PathAndBytesable<CuratorOp> builder3 = builder2.withACL(aclList); try http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpec.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpec.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpec.java index cb0f534..197405a 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpec.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpec.java @@ -82,9 +82,9 @@ public interface ModelSpec<T> ModelSpec<T> at(ZPath path); /** - * Return a new CuratorModel instance with all the same options but using the - * {@link ModelSpecBuilder#nodeName} functor - * to generate the child node's name + * Return a new CuratorModel instance with all the same options but by calling <code>toString()</code> + * on the model or, if it implements {@link org.apache.curator.x.async.modeled.NodeName}, it's + * <code>nodeName()</code> method to generate the child node's name. * * @param model model to use to generate the name * @return new Modeled Spec instance @@ -134,6 +134,13 @@ public interface ModelSpec<T> Set<DeleteOption> deleteOptions(); /** + * Return the TTL to use or -1 + * + * @return ttl + */ + long ttl(); + + /** * Return a Curator schema that validates ZNodes at this model's * path using this model's values * http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpecBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpecBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpecBuilder.java index 2ac2501..f6a2a51 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpecBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSpecBuilder.java @@ -37,6 +37,7 @@ public class ModelSpecBuilder<T> private List<ACL> aclList = Collections.emptyList(); private Set<CreateOption> createOptions = Collections.emptySet(); private Set<DeleteOption> deleteOptions = Collections.emptySet(); + private long ttl = -1; /** * Build a new ModelSpec instance @@ -45,7 +46,7 @@ public class ModelSpecBuilder<T> */ public ModelSpec<T> build() { - return new ModelSpecImpl<>(path, serializer, createMode, aclList, createOptions, deleteOptions); + return new ModelSpecImpl<>(path, serializer, createMode, aclList, createOptions, deleteOptions, ttl); } /** @@ -61,6 +62,22 @@ public class ModelSpecBuilder<T> } /** + * Specify a TTL when mode is {@link org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or + * {@link org.apache.zookeeper.CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}. If + * the znode has not been modified within the given TTL, it will be deleted once it has no + * children. The TTL unit is milliseconds and must be greater than 0 and less than or equal to + * EphemeralType.MAX_TTL. + * + * @param ttl the ttl + * @return this for chaining + */ + public ModelSpecBuilder<T> withTtl(long ttl) + { + this.ttl = ttl; + return this; + } + + /** * Use the given aclList for create operations on the Modeled Curator's ZNode * * @param aclList ACLs http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java index 7b88625..ed575ff 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFramework.java @@ -18,9 +18,9 @@ */ package org.apache.curator.x.async.modeled; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.zookeeper.data.Stat; @@ -35,7 +35,7 @@ public interface ModeledFramework<T> * @param model the model * @return new Modeled Curator instance */ - static <T> ModeledFramework<T> wrap(CuratorFramework client, ModelSpec<T> model) + static <T> ModeledFramework<T> wrap(AsyncCuratorFramework client, ModelSpec<T> model) { return builder(client, model).build(); } @@ -47,14 +47,14 @@ public interface ModeledFramework<T> * @param model the model * @return builder */ - static <T> ModeledFrameworkBuilder<T> builder(CuratorFramework client, ModelSpec<T> model) + static <T> ModeledFrameworkBuilder<T> builder(AsyncCuratorFramework client, ModelSpec<T> model) { return new ModeledFrameworkBuilder<>(client, model); } /** * Start a new ModeledFrameworkBuilder. A client and model must be provided prior to the instance - * being built via {@link org.apache.curator.x.async.modeled.ModeledFrameworkBuilder#withClient(org.apache.curator.framework.CuratorFramework)} + * being built via {@link org.apache.curator.x.async.modeled.ModeledFrameworkBuilder#withClient(org.apache.curator.x.async.AsyncCuratorFramework)} * and {@link org.apache.curator.x.async.modeled.ModeledFrameworkBuilder#withModelSpec(ModelSpec)} * * @return builder @@ -75,12 +75,12 @@ public interface ModeledFramework<T> CachedModeledFramework<T> cached(); /** - * Returns the client that was originally passed to {@link #wrap(org.apache.curator.framework.CuratorFramework, ModelSpec)} or + * Returns the client that was originally passed to {@link #wrap(org.apache.curator.x.async.AsyncCuratorFramework, ModelSpec)} or * the builder. * * @return original client */ - CuratorFramework unwrap(); + AsyncCuratorFramework unwrap(); /** * Return the model being used @@ -108,9 +108,9 @@ public interface ModeledFramework<T> ModeledFramework<T> at(ZPath path); /** - * Return a new Modeled Curator instance with all the same options but using the - * {@link ModelSpecBuilder#nodeName} functor - * to generate the child node's name + * Return a new Modeled Curator instance with all the same options but by calling <code>toString()</code> + * on the model or, if it implements {@link org.apache.curator.x.async.modeled.NodeName}, it's + * <code>nodeName()</code> method to generate the child node's name. * * @param model model to use to generate the name * @return new Modeled Curator instance http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java index d2c31b6..2e8bec3 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java @@ -18,9 +18,9 @@ */ package org.apache.curator.x.async.modeled; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.WatchMode; import org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl; import org.apache.zookeeper.WatchedEvent; @@ -29,7 +29,7 @@ import java.util.function.UnaryOperator; public class ModeledFrameworkBuilder<T> { - private CuratorFramework client; + private AsyncCuratorFramework client; private ModelSpec<T> modelSpec; private WatchMode watchMode; private UnaryOperator<WatchedEvent> watcherFilter; @@ -136,7 +136,7 @@ public class ModeledFrameworkBuilder<T> * @param client new client * @return this for chaining */ - public ModeledFrameworkBuilder<T> withClient(CuratorFramework client) + public ModeledFrameworkBuilder<T> withClient(AsyncCuratorFramework client) { this.client = Objects.requireNonNull(client, "client cannot be null"); return this; @@ -146,7 +146,7 @@ public class ModeledFrameworkBuilder<T> { } - ModeledFrameworkBuilder(CuratorFramework client, ModelSpec<T> modelSpec) + ModeledFrameworkBuilder(AsyncCuratorFramework client, ModelSpec<T> modelSpec) { this.client = Objects.requireNonNull(client, "client cannot be null"); this.modelSpec = Objects.requireNonNull(modelSpec, "modelSpec cannot be null"); http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java index 2f375a9..6e3b2e8 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java @@ -19,17 +19,17 @@ package org.apache.curator.x.async.modeled.details; import com.google.common.collect.Lists; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSpec; import org.apache.curator.x.async.modeled.ModeledFramework; -import org.apache.curator.x.async.modeled.cached.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCache; +import org.apache.curator.x.async.modeled.cached.ZNode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; @@ -44,7 +44,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> CachedModeledFrameworkImpl(ModeledFramework<T> client) { - this(client, new ModeledCacheImpl<>(client.unwrap(), client.modelSpec().path(), client.modelSpec().serializer(), client.modelSpec().createOptions().contains(CreateOption.compress))); + this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec().path(), client.modelSpec().serializer(), client.modelSpec().createOptions().contains(CreateOption.compress))); } private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache) @@ -78,7 +78,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T> } @Override - public CuratorFramework unwrap() + public AsyncCuratorFramework unwrap() { return client.unwrap(); } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java index fb1b888..7ac7007 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModelSpecImpl.java @@ -44,9 +44,10 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator private final List<ACL> aclList; private final Set<CreateOption> createOptions; private final Set<DeleteOption> deleteOptions; + private final long ttl; private final AtomicReference<Schema> schema = new AtomicReference<>(); - public ModelSpecImpl(ZPath path, ModelSerializer<T> serializer, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions) + public ModelSpecImpl(ZPath path, ModelSerializer<T> serializer, CreateMode createMode, List<ACL> aclList, Set<CreateOption> createOptions, Set<DeleteOption> deleteOptions, long ttl) { this.path = Objects.requireNonNull(path, "path cannot be null"); this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null"); @@ -54,6 +55,7 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator this.aclList = ImmutableList.copyOf(Objects.requireNonNull(aclList, "aclList cannot be null")); this.createOptions = ImmutableSet.copyOf(Objects.requireNonNull(createOptions, "createOptions cannot be null")); this.deleteOptions = ImmutableSet.copyOf(Objects.requireNonNull(deleteOptions, "deleteOptions cannot be null")); + this.ttl = ttl; } @Override @@ -71,7 +73,7 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator @Override public ModelSpec<T> at(ZPath newPath) { - return new ModelSpecImpl<>(newPath, serializer, createMode, aclList, createOptions, deleteOptions); + return new ModelSpecImpl<>(newPath, serializer, createMode, aclList, createOptions, deleteOptions, ttl); } @Override @@ -111,6 +113,12 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator } @Override + public long ttl() + { + return ttl; + } + + @Override public Schema schema() { Schema schemaValue = schema.get(); @@ -134,34 +142,38 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator return false; } - ModelSpecImpl<?> that = (ModelSpecImpl<?>)o; + ModelSpecImpl<?> modelSpec = (ModelSpecImpl<?>)o; - if ( !path.equals(that.path) ) + if ( ttl != modelSpec.ttl ) + { + return false; + } + if ( !path.equals(modelSpec.path) ) { return false; } - if ( !serializer.equals(that.serializer) ) + if ( !serializer.equals(modelSpec.serializer) ) { return false; } - if ( createMode != that.createMode ) + if ( createMode != modelSpec.createMode ) { return false; } - if ( !aclList.equals(that.aclList) ) + if ( !aclList.equals(modelSpec.aclList) ) { return false; } - if ( !createOptions.equals(that.createOptions) ) + if ( !createOptions.equals(modelSpec.createOptions) ) { return false; } //noinspection SimplifiableIfStatement - if ( !deleteOptions.equals(that.deleteOptions) ) + if ( !deleteOptions.equals(modelSpec.deleteOptions) ) { return false; } - return schema.equals(that.schema); + return schema.equals(modelSpec.schema); } @Override @@ -173,6 +185,7 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator result = 31 * result + aclList.hashCode(); result = 31 * result + createOptions.hashCode(); result = 31 * result + deleteOptions.hashCode(); + result = 31 * result + (int)(ttl ^ (ttl >>> 32)); result = 31 * result + schema.hashCode(); return result; } @@ -180,7 +193,7 @@ public class ModelSpecImpl<T> implements ModelSpec<T>, SchemaValidator @Override public String toString() { - return "CuratorModelImpl{" + "path=" + path + ", serializer=" + serializer + ", createMode=" + createMode + ", aclList=" + aclList + ", createOptions=" + createOptions + ", deleteOptions=" + deleteOptions + ", schema=" + schema + '}'; + return "ModelSpecImpl{" + "path=" + path + ", serializer=" + serializer + ", createMode=" + createMode + ", aclList=" + aclList + ", createOptions=" + createOptions + ", deleteOptions=" + deleteOptions + ", ttl=" + ttl + ", schema=" + schema + '}'; } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java index fd03977..f0e4fe8 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java @@ -68,6 +68,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> { try { + cache.getListenable().addListener(this); cache.start(); } catch ( Exception e ) @@ -78,6 +79,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> public void close() { + cache.getListenable().removeListener(this); cache.close(); entries.clear(); } @@ -117,17 +119,21 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T> case NODE_ADDED: case NODE_UPDATED: { - ZPath path = ZPath.from(event.toString()); - T model = serializer.deserialize(event.getData().getData()); - entries.put(path, new Entry<>(event.getData().getStat(), model)); - ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; - accept(type, path, event.getData().getStat(), model); + ZPath path = ZPath.parse(event.getData().getPath()); + byte[] bytes = event.getData().getData(); + if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created + { + T model = serializer.deserialize(bytes); + entries.put(path, new Entry<>(event.getData().getStat(), model)); + ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED; + accept(type, path, event.getData().getStat(), model); + } break; } case NODE_REMOVED: { - ZPath path = ZPath.from(event.toString()); + ZPath path = ZPath.parse(event.getData().getPath()); Entry<T> entry = entries.remove(path); T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData()); Stat stat = (entry != null) ? entry.stat : event.getData().getStat(); http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 46e88f5..cc10272 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -18,7 +18,6 @@ */ package org.apache.curator.x.async.modeled.details; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.transaction.CuratorOp; @@ -56,7 +55,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> private final UnaryOperator<CuratorEvent> resultFilter; private final AsyncCuratorFrameworkDsl dslClient; - public static <T> ModeledFrameworkImpl<T> build(CuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter) + public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter) { boolean localIsWatched = (watchMode != null); @@ -65,12 +64,11 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; - AsyncCuratorFramework asyncClient = AsyncCuratorFramework.wrap(client); - AsyncCuratorFrameworkDsl dslClient = asyncClient.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter); + AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter); WatchableAsyncCuratorFramework watchableClient = localIsWatched ? dslClient.watched() : dslClient; return new ModeledFrameworkImpl<>( - asyncClient, + client, dslClient, watchableClient, model, @@ -106,9 +104,9 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> } @Override - public CuratorFramework unwrap() + public AsyncCuratorFramework unwrap() { - return client.unwrap(); + return client; } @Override @@ -121,7 +119,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> public AsyncStage<String> set(T item, Stat storingStatIn) { byte[] bytes = modelSpec.serializer().serialize(item); - return dslClient.create().withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn).forPath(modelSpec.path().fullPath(), bytes); + return dslClient.create().withOptions(modelSpec.createOptions(), modelSpec.createMode(), fixAclList(modelSpec.aclList()), storingStatIn, modelSpec.ttl()).forPath(modelSpec.path().fullPath(), bytes); } private List<ACL> fixAclList(List<ACL> aclList) @@ -277,7 +275,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T> { return client.transactionOp() .create() - .withOptions(modelSpec.createMode(), modelSpec.aclList(), modelSpec.createOptions().contains(CreateOption.compress)) + .withOptions(modelSpec.createMode(), modelSpec.aclList(), modelSpec.createOptions().contains(CreateOption.compress), modelSpec.ttl()) .forPath(modelSpec.path().fullPath(), modelSpec.serializer().serialize(model)); } http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java index a184f30..5bdbe01 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ZPathImpl.java @@ -21,7 +21,6 @@ package org.apache.curator.x.async.modeled.details; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; -import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.modeled.ZPath; import org.apache.zookeeper.common.PathUtils; import java.util.Arrays; @@ -36,8 +35,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.curator.utils.ZKPaths.PATH_SEPARATOR; -import static org.apache.curator.x.async.modeled.ZPath.idName; -import static org.apache.curator.x.async.modeled.ZPath.parameterNodeName; public class ZPathImpl implements ZPath { @@ -195,8 +192,9 @@ public class ZPathImpl implements ZPath @Override public String toString() { - String value = nodes.stream().map(name -> name.equals(parameterNodeName) ? idName : name).collect(Collectors.joining(PATH_SEPARATOR, PATH_SEPARATOR, "")); - return "ZPathImpl{" + value + '}'; + return nodes.subList(1, nodes.size()) + .stream().map(name -> name.equals(parameterNodeName) ? idName : name) + .collect(Collectors.joining(PATH_SEPARATOR, PATH_SEPARATOR, "")); } @Override http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework.java index 3e14240..0e67443 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework.java @@ -1,6 +1,6 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @@ -14,11 +14,11 @@ public interface TypedModeledFramework<M, P1> * @param p1 the parameter * @return ZPath */ - ModeledFramework<M> resolved(CuratorFramework client, P1 p1); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework10.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework10.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework10.java index c5f6bdc..0b9b2a1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework10.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework10.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework10<M, P1, P2, P3, P4, P5, P6, P7, P8, P9, P10> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9, P10 p10); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9, P10 p10); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework2.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework2.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework2.java index 596a869..b387436 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework2.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework2.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework2<M, P1, P2> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework3.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework3.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework3.java index 647238d..5bb3a1f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework3.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework3.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework3<M, P1, P2, P3> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework4.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework4.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework4.java index 5f9a069..9887022 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework4.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework4.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework4<M, P1, P2, P3, P4> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework5.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework5.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework5.java index 723e65a..bea0fb5 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework5.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework5.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework5<M, P1, P2, P3, P4, P5> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework6.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework6.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework6.java index 2dea518..eb91734 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework6.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework6.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework6<M, P1, P2, P3, P4, P5, P6> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework7.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework7.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework7.java index 19a7e67..4643862 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework7.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework7.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework7<M, P1, P2, P3, P4, P5, P6, P7> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework8.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework8.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework8.java index 50dc248..c4f2426 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework8.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework8.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework8<M, P1, P2, P3, P4, P5, P6, P7, P8> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework9.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework9.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework9.java index 0a2632c..7604f34 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework9.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/typed/TypedModeledFramework9.java @@ -1,17 +1,17 @@ package org.apache.curator.x.async.modeled.typed; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ModeledFrameworkBuilder; @FunctionalInterface public interface TypedModeledFramework9<M, P1, P2, P3, P4, P5, P6, P7, P8, P9> { - ModeledFramework<M> resolved(CuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9); + ModeledFramework<M> resolved(AsyncCuratorFramework client, P1 p1, P2 p2, P3 p3, P4 p4, P5 p5, P6 p6, P7 p7, P8 p8, P9 p9); /** * Return a new TypedModeledFramework using the given modeled framework builder and typed model spec. - * When {@link #resolved(CuratorFramework, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the + * When {@link #resolved(AsyncCuratorFramework, Object, Object, Object, Object, Object, Object, Object, Object, Object)} is called the actual ModeledFramework is generated with the * resolved model spec * * @param frameworkBuilder ModeledFrameworkBuilder http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java index acca126..f14c485 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java @@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.CompletableBaseClassForTests; import org.apache.curator.x.async.modeled.models.TestModel; @@ -73,7 +74,7 @@ public class TestModeledFramework extends CompletableBaseClassForTests { TestModel rawModel = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); TestModel rawModel2 = new TestModel("Wayne", "Rooney", "Old Trafford", 10, BigInteger.valueOf(1)); - ModeledFramework<TestModel> client = ModeledFramework.wrap(rawClient, modelSpec); + ModeledFramework<TestModel> client = ModeledFramework.wrap(AsyncCuratorFramework.wrap(rawClient), modelSpec); AsyncStage<String> stage = client.set(rawModel); Assert.assertNull(stage.event()); complete(stage, (s, e) -> Assert.assertNotNull(s)); @@ -88,10 +89,10 @@ public class TestModeledFramework extends CompletableBaseClassForTests public void testBackwardCompatibility() { TestNewerModel rawNewModel = new TestNewerModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1), 100); - ModeledFramework<TestNewerModel> clientForNew = ModeledFramework.wrap(rawClient, newModelSpec); + ModeledFramework<TestNewerModel> clientForNew = ModeledFramework.wrap(AsyncCuratorFramework.wrap(rawClient), newModelSpec); complete(clientForNew.set(rawNewModel), (s, e) -> Assert.assertNotNull(s)); - ModeledFramework<TestModel> clientForOld = ModeledFramework.wrap(rawClient, modelSpec); + ModeledFramework<TestModel> clientForOld = ModeledFramework.wrap(AsyncCuratorFramework.wrap(rawClient), modelSpec); complete(clientForOld.read(), (model, e) -> Assert.assertTrue(rawNewModel.equalsOld(model))); } @@ -99,7 +100,7 @@ public class TestModeledFramework extends CompletableBaseClassForTests public void testWatched() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - ModeledFramework<TestModel> client = ModeledFramework.builder(rawClient, modelSpec).watched().build(); + ModeledFramework<TestModel> client = ModeledFramework.builder(AsyncCuratorFramework.wrap(rawClient), modelSpec).watched().build(); client.checkExists().event().whenComplete((event, ex) -> latch.countDown()); timing.sleepABit(); Assert.assertEquals(latch.getCount(), 1); @@ -111,7 +112,7 @@ public class TestModeledFramework extends CompletableBaseClassForTests public void testGetChildren() { TestModel model = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); - ModeledFramework<TestModel> client = ModeledFramework.builder(rawClient, modelSpec).build(); + ModeledFramework<TestModel> client = ModeledFramework.builder(AsyncCuratorFramework.wrap(rawClient), modelSpec).build(); complete(client.at("one").set(model)); complete(client.at("two").set(model)); complete(client.at("three").set(model)); http://git-wip-us.apache.org/repos/asf/curator/blob/713bf467/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledFramework.java index 70ef93e..ae8a6bd 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/details/TestCachedModeledFramework.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.CompletableBaseClassForTests; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.ModelSpec; @@ -52,7 +53,7 @@ public class TestCachedModeledFramework extends CompletableBaseClassForTests rawClient.start(); ModelSerializer<TestSimpleModel> serializer = new JacksonModelSerializer<>(TestSimpleModel.class); - client = ModeledFramework.builder(rawClient, ModelSpec.builder(path, serializer).build()).build().cached(); + client = ModeledFramework.builder(AsyncCuratorFramework.wrap(rawClient), ModelSpec.builder(path, serializer).build()).build().cached(); } @AfterMethod
