finished docs for the pub-sub example
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5ac1a331 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5ac1a331 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5ac1a331 Branch: refs/heads/CURATOR-397 Commit: 5ac1a3314e372d8bd22ef7bba704b5621a1638b8 Parents: c97fae4 Author: randgalt <[email protected]> Authored: Wed May 3 22:59:47 2017 -0500 Committer: randgalt <[email protected]> Committed: Wed May 3 22:59:47 2017 -0500 ---------------------------------------------------------------------- curator-examples/src/main/java/pubsub/README.md | 16 ++++++++++++++ .../src/main/java/pubsub/SubPubTest.java | 23 +++++++++++++------- 2 files changed, 31 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/5ac1a331/curator-examples/src/main/java/pubsub/README.md ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/README.md b/curator-examples/src/main/java/pubsub/README.md index 51a7048..adea2ef 100644 --- a/curator-examples/src/main/java/pubsub/README.md +++ b/curator-examples/src/main/java/pubsub/README.md @@ -56,3 +56,19 @@ In this example, the TypedModelSpecs are defined in `Clients.java`. E.g. public static final TypedModeledFramework<Instance, InstanceType> instanceClient = TypedModeledFramework.from(ModeledFramework.builder(), ModelSpecs.instanceModelSpec) ``` + +## Publisher + +`Publisher.java` shows how to use the ModeledFramework to write models. There are methods to write single instances and to write lists of instances in a transaction. Each publish method resolves the appropriate typed client and then calls its `set()` method with the given model. + +## Subscriber + +`Subscriber.java` uses CachedModeledFrameworks to listen for changes on the parent nodes for all of the models in this example. Each of the methods resolves the appropriate typed client and then starts the cache (via `cached()`). + +## SubPubTest + +`SubPubTest.java` is a class that exercises this example. + +* `start()` uses `Subscriber` to start a `CachedModeledFramework` for each combination of the Instance + InstanceType, LocationAvailable + Group + Priority, and UserCreated + Group + Priority. It then adds a simple listener to each cache that merely prints the class name and path whenever an update occurs (see `generalListener()`). +* `start()` also starts a scheduled task that runs every second. This task calls `publishSomething()` +* `publishSomething()` randomly publishes either a single Instance, LocationAvailable, UserCreated or a list of those. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/curator/blob/5ac1a331/curator-examples/src/main/java/pubsub/SubPubTest.java ---------------------------------------------------------------------- diff --git a/curator-examples/src/main/java/pubsub/SubPubTest.java b/curator-examples/src/main/java/pubsub/SubPubTest.java index 90b7699..b5c1629 100644 --- a/curator-examples/src/main/java/pubsub/SubPubTest.java +++ b/curator-examples/src/main/java/pubsub/SubPubTest.java @@ -50,14 +50,14 @@ public class SubPubTest implements Closeable { private final TestingServer testingServer; private final AsyncCuratorFramework client; - private final Publisher publisher; private final ScheduledExecutorService executorService; private final List<CachedModeledFramework<Instance>> instanceSubscribers = new ArrayList<>(); private final List<CachedModeledFramework<LocationAvailable>> locationAvailableSubscribers = new ArrayList<>(); private final List<CachedModeledFramework<UserCreated>> userCreatedSubscribers = new ArrayList<>(); - private static final AtomicLong id = new AtomicLong(1); + private static final AtomicLong nextId = new AtomicLong(1); + // arrays of random values used for this example private static final Group[] groups = {new Group("main"), new Group("admin")}; private static final String[] hostnames = {"host1", "host2", "host3"}; private static final Integer[] ports = {80, 443, 9999}; @@ -70,7 +70,7 @@ public class SubPubTest implements Closeable try ( SubPubTest subPubTest = new SubPubTest() ) { subPubTest.start(); - TimeUnit.MINUTES.sleep(1); + TimeUnit.MINUTES.sleep(1); // run the test for a minute then exit } } @@ -78,7 +78,6 @@ public class SubPubTest implements Closeable { this.testingServer = new TestingServer(); client = AsyncCuratorFramework.wrap(CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new RetryOneTime(1))); - publisher = new Publisher(client); executorService = Executors.newSingleThreadScheduledExecutor(); } @@ -86,30 +85,37 @@ public class SubPubTest implements Closeable { client.unwrap().start(); + Publisher publisher = new Publisher(client); Subscriber subscriber = new Subscriber(client); + + // start a subscriber/cache for Instances of each InstanceType instanceSubscribers.addAll( Arrays.stream(InstanceType.values()) .map(subscriber::startInstanceSubscriber) .collect(Collectors.toList()) ); + // start a subscriber/cache for LocationAvailables of each combination of Group and Priority locationAvailableSubscribers.addAll( Arrays.stream(Priority.values()) .flatMap(priority -> Arrays.stream(groups).map(group -> subscriber.startLocationAvailableSubscriber(group, priority))) .collect(Collectors.toList()) ); + // start a subscriber/cache for UserCreateds of each combination of Group and Priority userCreatedSubscribers.addAll( Arrays.stream(Priority.values()) .flatMap(priority -> Arrays.stream(groups).map(group -> subscriber.startUserCreatedSubscriber(group, priority))) .collect(Collectors.toList()) ); + // add listeners for each of the caches instanceSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener())); locationAvailableSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener())); userCreatedSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener())); - executorService.scheduleAtFixedRate(this::publishSomething, 1, 1, TimeUnit.SECONDS); + // schedule the publisher task once a second + executorService.scheduleAtFixedRate(() -> publishSomething(publisher), 1, 1, TimeUnit.SECONDS); } @Override @@ -132,9 +138,10 @@ public class SubPubTest implements Closeable testingServer.close(); } - private void publishSomething() + private void publishSomething(Publisher publisher) { - switch ( ThreadLocalRandom.current().nextInt(5) ) + // randomly do some publishing - either single items or lists of items in a transaction + switch ( ThreadLocalRandom.current().nextInt(6) ) { case 0: { @@ -206,6 +213,6 @@ public class SubPubTest implements Closeable private String nextId() { - return Long.toString(id.getAndIncrement()); + return Long.toString(nextId.getAndIncrement()); } }
