finished docs for the pub-sub example

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5ac1a331
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5ac1a331
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5ac1a331

Branch: refs/heads/CURATOR-397
Commit: 5ac1a3314e372d8bd22ef7bba704b5621a1638b8
Parents: c97fae4
Author: randgalt <[email protected]>
Authored: Wed May 3 22:59:47 2017 -0500
Committer: randgalt <[email protected]>
Committed: Wed May 3 22:59:47 2017 -0500

----------------------------------------------------------------------
 curator-examples/src/main/java/pubsub/README.md | 16 ++++++++++++++
 .../src/main/java/pubsub/SubPubTest.java        | 23 +++++++++++++-------
 2 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5ac1a331/curator-examples/src/main/java/pubsub/README.md
----------------------------------------------------------------------
diff --git a/curator-examples/src/main/java/pubsub/README.md 
b/curator-examples/src/main/java/pubsub/README.md
index 51a7048..adea2ef 100644
--- a/curator-examples/src/main/java/pubsub/README.md
+++ b/curator-examples/src/main/java/pubsub/README.md
@@ -56,3 +56,19 @@ In this example, the TypedModelSpecs are defined in 
`Clients.java`. E.g.
 public static final TypedModeledFramework<Instance, InstanceType> 
instanceClient = 
    TypedModeledFramework.from(ModeledFramework.builder(), 
ModelSpecs.instanceModelSpec)
 ```
+
+## Publisher
+
+`Publisher.java` shows how to use the ModeledFramework to write models. There 
are methods to write single instances and to write lists of instances in a 
transaction. Each publish method resolves the appropriate typed client and then 
calls its `set()` method with the given model.
+
+## Subscriber
+
+`Subscriber.java` uses CachedModeledFrameworks to listen for changes on the 
parent nodes for all of the models in this example. Each of the methods 
resolves the appropriate typed client and then starts the cache (via 
`cached()`).
+
+## SubPubTest
+
+`SubPubTest.java` is a class that exercises this example. 
+
+* `start()` uses `Subscriber` to start a `CachedModeledFramework` for each 
combination of the Instance + InstanceType, LocationAvailable + Group + 
Priority, and UserCreated + Group + Priority. It then adds a simple listener to 
each cache that merely prints the class name and path whenever an update occurs 
(see `generalListener()`).
+* `start()` also starts a scheduled task that runs every second. This task 
calls `publishSomething()`
+* `publishSomething()` randomly publishes either a single Instance, 
LocationAvailable, UserCreated or a list of those.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/curator/blob/5ac1a331/curator-examples/src/main/java/pubsub/SubPubTest.java
----------------------------------------------------------------------
diff --git a/curator-examples/src/main/java/pubsub/SubPubTest.java 
b/curator-examples/src/main/java/pubsub/SubPubTest.java
index 90b7699..b5c1629 100644
--- a/curator-examples/src/main/java/pubsub/SubPubTest.java
+++ b/curator-examples/src/main/java/pubsub/SubPubTest.java
@@ -50,14 +50,14 @@ public class SubPubTest implements Closeable
 {
     private final TestingServer testingServer;
     private final AsyncCuratorFramework client;
-    private final Publisher publisher;
     private final ScheduledExecutorService executorService;
     private final List<CachedModeledFramework<Instance>> instanceSubscribers = 
new ArrayList<>();
     private final List<CachedModeledFramework<LocationAvailable>> 
locationAvailableSubscribers = new ArrayList<>();
     private final List<CachedModeledFramework<UserCreated>> 
userCreatedSubscribers = new ArrayList<>();
 
-    private static final AtomicLong id = new AtomicLong(1);
+    private static final AtomicLong nextId = new AtomicLong(1);
 
+    // arrays of random values used for this example
     private static final Group[] groups = {new Group("main"), new 
Group("admin")};
     private static final String[] hostnames = {"host1", "host2", "host3"};
     private static final Integer[] ports = {80, 443, 9999};
@@ -70,7 +70,7 @@ public class SubPubTest implements Closeable
         try ( SubPubTest subPubTest = new SubPubTest() )
         {
             subPubTest.start();
-            TimeUnit.MINUTES.sleep(1);
+            TimeUnit.MINUTES.sleep(1);  // run the test for a minute then exit
         }
     }
 
@@ -78,7 +78,6 @@ public class SubPubTest implements Closeable
     {
         this.testingServer = new TestingServer();
         client = 
AsyncCuratorFramework.wrap(CuratorFrameworkFactory.newClient(testingServer.getConnectString(),
 new RetryOneTime(1)));
-        publisher = new Publisher(client);
         executorService = Executors.newSingleThreadScheduledExecutor();
     }
 
@@ -86,30 +85,37 @@ public class SubPubTest implements Closeable
     {
         client.unwrap().start();
 
+        Publisher publisher = new Publisher(client);
         Subscriber subscriber = new Subscriber(client);
+
+        // start a subscriber/cache for Instances of each InstanceType
         instanceSubscribers.addAll(
             Arrays.stream(InstanceType.values())
             .map(subscriber::startInstanceSubscriber)
             .collect(Collectors.toList())
         );
 
+        // start a subscriber/cache for LocationAvailables of each combination 
of Group and Priority
         locationAvailableSubscribers.addAll(
             Arrays.stream(Priority.values())
                 .flatMap(priority -> Arrays.stream(groups).map(group -> 
subscriber.startLocationAvailableSubscriber(group, priority)))
                 .collect(Collectors.toList())
         );
 
+        // start a subscriber/cache for UserCreateds of each combination of 
Group and Priority
         userCreatedSubscribers.addAll(
             Arrays.stream(Priority.values())
                 .flatMap(priority -> Arrays.stream(groups).map(group -> 
subscriber.startUserCreatedSubscriber(group, priority)))
                 .collect(Collectors.toList())
         );
 
+        // add listeners for each of the caches
         instanceSubscribers.forEach(s -> 
s.getCache().listenable().addListener(generalListener()));
         locationAvailableSubscribers.forEach(s -> 
s.getCache().listenable().addListener(generalListener()));
         userCreatedSubscribers.forEach(s -> 
s.getCache().listenable().addListener(generalListener()));
 
-        executorService.scheduleAtFixedRate(this::publishSomething, 1, 1, 
TimeUnit.SECONDS);
+        // schedule the publisher task once a second
+        executorService.scheduleAtFixedRate(() -> publishSomething(publisher), 
1, 1, TimeUnit.SECONDS);
     }
 
     @Override
@@ -132,9 +138,10 @@ public class SubPubTest implements Closeable
         testingServer.close();
     }
 
-    private void publishSomething()
+    private void publishSomething(Publisher publisher)
     {
-        switch ( ThreadLocalRandom.current().nextInt(5) )
+        // randomly do some publishing - either single items or lists of items 
in a transaction
+        switch ( ThreadLocalRandom.current().nextInt(6) )
         {
             case 0:
             {
@@ -206,6 +213,6 @@ public class SubPubTest implements Closeable
 
     private String nextId()
     {
-        return Long.toString(id.getAndIncrement());
+        return Long.toString(nextId.getAndIncrement());
     }
 }

Reply via email to