Repository: curator Updated Branches: refs/heads/CURATOR-397 69f1829d0 -> aa86931b9
refactoring Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/37927efa Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/37927efa Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/37927efa Branch: refs/heads/CURATOR-397 Commit: 37927efa65ca755407f1032ef45cb46a11372184 Parents: 69f1829 Author: randgalt <[email protected]> Authored: Wed May 3 07:42:13 2017 -0500 Committer: randgalt <[email protected]> Committed: Wed May 3 07:42:13 2017 -0500 ---------------------------------------------------------------------- .../src/main/java/pubsub/Publisher.java | 136 ------------------- .../src/main/java/pubsub/SubPubTest.java | 3 +- .../src/main/java/pubsub/Subscriber.java | 72 ---------- .../src/main/java/pubsub/util/Publisher.java | 136 +++++++++++++++++++ .../src/main/java/pubsub/util/Subscriber.java | 72 ++++++++++ 5 files changed, 210 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/37927efa/curator-examples/src/main/java/pubsub/Publisher.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/Publisher.java b/curator-examples/src/main/java/pubsub/Publisher.java deleted file mode 100644 index 8854b35..0000000 --- a/curator-examples/src/main/java/pubsub/Publisher.java +++ /dev/null @@ -1,136 +0,0 @@ -package pubsub; - -import org.apache.curator.framework.api.transaction.CuratorOp; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.modeled.ModeledFramework; -import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import pubsub.messages.LocationAvailable; -import pubsub.messages.UserCreated; -import pubsub.models.Group; -import pubsub.models.Instance; -import pubsub.models.Message; -import pubsub.models.Priority; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -import static pubsub.builders.Clients.*; - -public class Publisher -{ - private final Logger log = LoggerFactory.getLogger(getClass()); - private final AsyncCuratorFramework client; - - public Publisher(AsyncCuratorFramework client) - { - this.client = Objects.requireNonNull(client, "client cannot be null"); - } - - /** - * Publish the given instance using the Instance client template - * - * @param instance instance to publish - */ - public void publishInstance(Instance instance) - { - ModeledFramework<Instance> resolvedClient = instanceClient - .resolved(client, instance.getType()) // this resolves to the parent path - .resolved(instance); // this resolves to a child node - uses the Instance's id because Instance extends NodeName - resolvedClient.set(instance).exceptionally(e -> { - log.error("Could not publish instance: " + instance, e); - return null; - }); - } - - /** - * Publish the given instances using the Instance client template in a transaction - * - * @param instances instances to publish - */ - public void publishInstances(List<Instance> instances) - { - List<CuratorOp> operations = instances.stream() - .map(instance -> instanceClient - .resolved(client, instance.getType()) // this resolves to the parent path - .resolved(instance) // this resolves to a child node - uses the Instance's id because Instance extends NodeName - .createOp(instance) - ) - .collect(Collectors.toList()); - client.transaction().forOperations(operations).exceptionally(e -> { - log.error("Could not publish instances: " + instances, e); - return null; - }); - } - - /** - * Publish the given LocationAvailable using the LocationAvailable client template - * - * @param group group - * @param locationAvailable message to publish - */ - public void publishLocationAvailable(Group group, LocationAvailable locationAvailable) - { - publishMessage(locationAvailableClient, group, locationAvailable); - } - - /** - * Publish the given UserCreated using the UserCreated client template - * - * @param group group - * @param userCreated message to publish - */ - public void publishUserCreated(Group group, UserCreated userCreated) - { - publishMessage(userCreatedClient, group, userCreated); - } - - /** - * Publish the given LocationAvailables using the LocationAvailable client template in a transaction - * - * @param group group - * @param locationsAvailable messages to publish - */ - public void publishLocationsAvailable(Group group, List<LocationAvailable> locationsAvailable) - { - publishMessages(locationAvailableClient, group, locationsAvailable); - } - - /** - * Publish the given UserCreateds using the UserCreated client template in a transaction - * - * @param group group - * @param usersCreated messages to publish - */ - public void publishUsersCreated(Group group, List<UserCreated> usersCreated) - { - publishMessages(userCreatedClient, group, usersCreated); - } - - private <T extends Message> void publishMessage(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, T message) - { - ModeledFramework<T> resolvedClient = typedClient - .resolved(client, group, message.getPriority()) - .resolved(message); - resolvedClient.set(message).exceptionally(e -> { - log.error("Could not publish message: " + message, e); - return null; - }); - } - - private <T extends Message> void publishMessages(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, List<T> messages) - { - List<CuratorOp> operations = messages.stream() - .map(message -> typedClient - .resolved(client, group, message.getPriority()) // this resolves to the parent path - .resolved(message) // this resolves to a child node - uses the Message's id because Message extends NodeName - .createOp(message) - ) - .collect(Collectors.toList()); - client.transaction().forOperations(operations).exceptionally(e -> { - log.error("Could not publish messages: " + messages, e); - return null; - }); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/37927efa/curator-examples/src/main/java/pubsub/SubPubTest.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/SubPubTest.java b/curator-examples/src/main/java/pubsub/SubPubTest.java index 32bd578..ed7b95c 100644 --- a/curator-examples/src/main/java/pubsub/SubPubTest.java +++ b/curator-examples/src/main/java/pubsub/SubPubTest.java @@ -12,13 +12,14 @@ import pubsub.models.Group; import pubsub.models.Instance; import pubsub.models.InstanceType; import pubsub.models.Priority; +import pubsub.util.Publisher; +import pubsub.util.Subscriber; import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; http://git-wip-us.apache.org/repos/asf/curator/blob/37927efa/curator-examples/src/main/java/pubsub/Subscriber.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/Subscriber.java b/curator-examples/src/main/java/pubsub/Subscriber.java deleted file mode 100644 index 1acee0b..0000000 --- a/curator-examples/src/main/java/pubsub/Subscriber.java +++ /dev/null @@ -1,72 +0,0 @@ -package pubsub; - -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; -import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; -import pubsub.messages.LocationAvailable; -import pubsub.messages.UserCreated; -import pubsub.models.Group; -import pubsub.models.Instance; -import pubsub.models.InstanceType; -import pubsub.models.Message; -import pubsub.models.Priority; - -import static pubsub.builders.Clients.*; - -public class Subscriber -{ - private final AsyncCuratorFramework client; - - public Subscriber(AsyncCuratorFramework client) - { - this.client = client; - } - - /** - * Start a subscriber (a CachedModeledFramework instance) using the LocationAvailable client template - * - * @param group group to listen for - * @param priority priority to listen for - * @return CachedModeledFramework instance (already started) - */ - public CachedModeledFramework<LocationAvailable> startLocationAvailableSubscriber(Group group, Priority priority) - { - return startSubscriber(locationAvailableClient, group, priority); - } - - /** - * Start a subscriber (a CachedModeledFramework instance) using the UserCreated client template - * - * @param group group to listen for - * @param priority priority to listen for - * @return CachedModeledFramework instance (already started) - */ - public CachedModeledFramework<UserCreated> startUserCreatedSubscriber(Group group, Priority priority) - { - return startSubscriber(userCreatedClient, group, priority); - } - - /** - * Start a subscriber (a CachedModeledFramework instance) using the Instance client template - * - * @param instanceType type to listen for - * @return CachedModeledFramework instance (already started) - */ - public CachedModeledFramework<Instance> startInstanceSubscriber(InstanceType instanceType) - { - CachedModeledFramework<Instance> resolved = instanceClient - .resolved(client, instanceType) // resolves to the parent path - models are children of this path - .cached(); // makes a cached modeled instance - resolved.start(); - return resolved; - } - - private <T extends Message> CachedModeledFramework<T> startSubscriber(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, Priority priority) - { - CachedModeledFramework<T> resolved = typedClient - .resolved(client, group, priority) // resolves to the parent path - models are children of this path - .cached(); // makes a cached modeled instance - resolved.start(); - return resolved; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/37927efa/curator-examples/src/main/java/pubsub/util/Publisher.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/util/Publisher.java b/curator-examples/src/main/java/pubsub/util/Publisher.java new file mode 100644 index 0000000..c7541fa --- /dev/null +++ b/curator-examples/src/main/java/pubsub/util/Publisher.java @@ -0,0 +1,136 @@ +package pubsub.util; + +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; +import pubsub.models.Group; +import pubsub.models.Instance; +import pubsub.models.Message; +import pubsub.models.Priority; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static pubsub.builders.Clients.*; + +public class Publisher +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AsyncCuratorFramework client; + + public Publisher(AsyncCuratorFramework client) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + } + + /** + * Publish the given instance using the Instance client template + * + * @param instance instance to publish + */ + public void publishInstance(Instance instance) + { + ModeledFramework<Instance> resolvedClient = instanceClient + .resolved(client, instance.getType()) // this resolves to the parent path + .resolved(instance); // this resolves to a child node - uses the Instance's id because Instance extends NodeName + resolvedClient.set(instance).exceptionally(e -> { + log.error("Could not publish instance: " + instance, e); + return null; + }); + } + + /** + * Publish the given instances using the Instance client template in a transaction + * + * @param instances instances to publish + */ + public void publishInstances(List<Instance> instances) + { + List<CuratorOp> operations = instances.stream() + .map(instance -> instanceClient + .resolved(client, instance.getType()) // this resolves to the parent path + .resolved(instance) // this resolves to a child node - uses the Instance's id because Instance extends NodeName + .createOp(instance) + ) + .collect(Collectors.toList()); + client.transaction().forOperations(operations).exceptionally(e -> { + log.error("Could not publish instances: " + instances, e); + return null; + }); + } + + /** + * Publish the given LocationAvailable using the LocationAvailable client template + * + * @param group group + * @param locationAvailable message to publish + */ + public void publishLocationAvailable(Group group, LocationAvailable locationAvailable) + { + publishMessage(locationAvailableClient, group, locationAvailable); + } + + /** + * Publish the given UserCreated using the UserCreated client template + * + * @param group group + * @param userCreated message to publish + */ + public void publishUserCreated(Group group, UserCreated userCreated) + { + publishMessage(userCreatedClient, group, userCreated); + } + + /** + * Publish the given LocationAvailables using the LocationAvailable client template in a transaction + * + * @param group group + * @param locationsAvailable messages to publish + */ + public void publishLocationsAvailable(Group group, List<LocationAvailable> locationsAvailable) + { + publishMessages(locationAvailableClient, group, locationsAvailable); + } + + /** + * Publish the given UserCreateds using the UserCreated client template in a transaction + * + * @param group group + * @param usersCreated messages to publish + */ + public void publishUsersCreated(Group group, List<UserCreated> usersCreated) + { + publishMessages(userCreatedClient, group, usersCreated); + } + + private <T extends Message> void publishMessage(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, T message) + { + ModeledFramework<T> resolvedClient = typedClient + .resolved(client, group, message.getPriority()) + .resolved(message); + resolvedClient.set(message).exceptionally(e -> { + log.error("Could not publish message: " + message, e); + return null; + }); + } + + private <T extends Message> void publishMessages(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, List<T> messages) + { + List<CuratorOp> operations = messages.stream() + .map(message -> typedClient + .resolved(client, group, message.getPriority()) // this resolves to the parent path + .resolved(message) // this resolves to a child node - uses the Message's id because Message extends NodeName + .createOp(message) + ) + .collect(Collectors.toList()); + client.transaction().forOperations(operations).exceptionally(e -> { + log.error("Could not publish messages: " + messages, e); + return null; + }); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/37927efa/curator-examples/src/main/java/pubsub/util/Subscriber.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/util/Subscriber.java b/curator-examples/src/main/java/pubsub/util/Subscriber.java new file mode 100644 index 0000000..3b0d542 --- /dev/null +++ b/curator-examples/src/main/java/pubsub/util/Subscriber.java @@ -0,0 +1,72 @@ +package pubsub.util; + +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.typed.TypedModeledFramework2; +import pubsub.messages.LocationAvailable; +import pubsub.messages.UserCreated; +import pubsub.models.Group; +import pubsub.models.Instance; +import pubsub.models.InstanceType; +import pubsub.models.Message; +import pubsub.models.Priority; + +import static pubsub.builders.Clients.*; + +public class Subscriber +{ + private final AsyncCuratorFramework client; + + public Subscriber(AsyncCuratorFramework client) + { + this.client = client; + } + + /** + * Start a subscriber (a CachedModeledFramework instance) using the LocationAvailable client template + * + * @param group group to listen for + * @param priority priority to listen for + * @return CachedModeledFramework instance (already started) + */ + public CachedModeledFramework<LocationAvailable> startLocationAvailableSubscriber(Group group, Priority priority) + { + return startSubscriber(locationAvailableClient, group, priority); + } + + /** + * Start a subscriber (a CachedModeledFramework instance) using the UserCreated client template + * + * @param group group to listen for + * @param priority priority to listen for + * @return CachedModeledFramework instance (already started) + */ + public CachedModeledFramework<UserCreated> startUserCreatedSubscriber(Group group, Priority priority) + { + return startSubscriber(userCreatedClient, group, priority); + } + + /** + * Start a subscriber (a CachedModeledFramework instance) using the Instance client template + * + * @param instanceType type to listen for + * @return CachedModeledFramework instance (already started) + */ + public CachedModeledFramework<Instance> startInstanceSubscriber(InstanceType instanceType) + { + CachedModeledFramework<Instance> resolved = instanceClient + .resolved(client, instanceType) // resolves to the parent path - models are children of this path + .cached(); // makes a cached modeled instance + resolved.start(); + return resolved; + } + + private <T extends Message> CachedModeledFramework<T> startSubscriber(TypedModeledFramework2<T, Group, Priority> typedClient, Group group, Priority priority) + { + CachedModeledFramework<T> resolved = typedClient + .resolved(client, group, priority) // resolves to the parent path - models are children of this path + .cached(); // makes a cached modeled instance + resolved.start(); + return resolved; + } +}
