Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch 748adc644 -> 8ae86e153


Merge branch 'STORM-1868' of https://github.com/arunmahadevan/storm into 
STORM-1868


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0ed7a09a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0ed7a09a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0ed7a09a

Branch: refs/heads/1.0.x-branch
Commit: 0ed7a09ae8a1ca834dd2ac5cc48b871f2a890459
Parents: 748adc6
Author: Sriharsha Chintalapani <har...@hortonworks.com>
Authored: Fri May 27 14:14:16 2016 -0700
Committer: Sriharsha Chintalapani <har...@hortonworks.com>
Committed: Fri May 27 14:18:47 2016 -0700

----------------------------------------------------------------------
 .../starter/trident/TridentKafkaWordCount.java  | 67 ++++++++++++++------
 1 file changed, 47 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0ed7a09a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
index dc4cb4b..2b25fad 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
@@ -26,6 +26,7 @@ package org.apache.storm.starter.trident;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.spout.SchemeAsMultiScheme;
 import org.apache.storm.topology.TopologyBuilder;
@@ -182,17 +183,33 @@ public class TridentKafkaWordCount {
      * <p>
      * To run this topology ensure you have a kafka broker running.
      * </p>
-     * Create a topic test with command line,
+     * Create a topic 'test' with command line,
+     * <pre>
      * kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partition 1 --topic test
+     * </pre>
+     * To run in local mode,
+     * <pre>
+     * storm jar storm-starter-topologies-{version}.jar 
org.apache.storm.starter.trident.TridentKafkaWordCount
+     * </pre>
+     * This will also run a local DRPC query and print the word counts.
+     * <p>
+     * To run in distributed mode, run it with a topology name. You will also 
need to start a drpc server and
+     * specify the drpc server details storm.yaml before submitting the 
topology.
+     * </p>
+     * <pre>
+     * storm jar storm-starter-topologies-{version}.jar 
org.apache.storm.starter.trident.TridentKafkaWordCount zkhost:port broker:port 
wordcount
+     * </pre>
+     * This will submit two topologies, one for the producer and another for 
the consumer. You can query the results
+     * (word counts) by running an external drpc query against the drpc server.
      */
     public static void main(String[] args) throws Exception {
 
         String zkUrl = "localhost:2181";        // the defaults.
         String brokerUrl = "localhost:9092";
 
-        if (args.length > 2 || (args.length == 1 && 
args[0].matches("^-h|--help$"))) {
-            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper 
url] [kafka broker url]");
-            System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" 
+ " [" + brokerUrl + "]");
+        if (args.length > 3 || (args.length == 1 && 
args[0].matches("^-h|--help$"))) {
+            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper 
url] [kafka broker url] [topology name]");
+            System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" 
+ " [" + brokerUrl + "] [wordcount]");
             System.exit(1);
         } else if (args.length == 1) {
             zkUrl = args[0];
@@ -205,25 +222,35 @@ public class TridentKafkaWordCount {
 
         TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, 
brokerUrl);
 
-        LocalDRPC drpc = new LocalDRPC();
-        LocalCluster cluster = new LocalCluster();
+        if (args.length == 3)  {
+            Config conf = new Config();
+            conf.setMaxSpoutPending(20);
+            conf.setNumWorkers(1);
+            // submit the consumer topology.
+            StormSubmitter.submitTopology(args[2] + "-consumer", conf, 
wordCount.buildConsumerTopology(null));
+            // submit the producer topology.
+            StormSubmitter.submitTopology(args[2] + "-producer", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
+        } else {
+            LocalDRPC drpc = new LocalDRPC();
+            LocalCluster cluster = new LocalCluster();
 
-        // submit the consumer topology.
-        cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), 
wordCount.buildConsumerTopology(drpc));
+            // submit the consumer topology.
+            cluster.submitTopology("wordCounter", 
wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc));
 
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        // submit the producer topology.
-        cluster.submitTopology("kafkaBolt", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
+            Config conf = new Config();
+            conf.setMaxSpoutPending(20);
+            // submit the producer topology.
+            cluster.submitTopology("kafkaBolt", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
 
-        // keep querying the word counts for a minute.
-        for (int i = 0; i < 60; i++) {
-            System.out.println("DRPC RESULT: " + drpc.execute("words", "the 
and apple snow jumped"));
-            Thread.sleep(1000);
-        }
+            // keep querying the word counts for a minute.
+            for (int i = 0; i < 60; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("words", 
"the and apple snow jumped"));
+                Thread.sleep(1000);
+            }
 
-        cluster.killTopology("kafkaBolt");
-        cluster.killTopology("wordCounter");
-        cluster.shutdown();
+            cluster.killTopology("kafkaBolt");
+            cluster.killTopology("wordCounter");
+            cluster.shutdown();
+        }
     }
 }

Reply via email to