Repository: storm Updated Branches: refs/heads/1.0.x-branch 748adc644 -> 8ae86e153
Merge branch 'STORM-1868' of https://github.com/arunmahadevan/storm into STORM-1868 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0ed7a09a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0ed7a09a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0ed7a09a Branch: refs/heads/1.0.x-branch Commit: 0ed7a09ae8a1ca834dd2ac5cc48b871f2a890459 Parents: 748adc6 Author: Sriharsha Chintalapani <har...@hortonworks.com> Authored: Fri May 27 14:14:16 2016 -0700 Committer: Sriharsha Chintalapani <har...@hortonworks.com> Committed: Fri May 27 14:18:47 2016 -0700 ---------------------------------------------------------------------- .../starter/trident/TridentKafkaWordCount.java | 67 ++++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0ed7a09a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java index dc4cb4b..2b25fad 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java @@ -26,6 +26,7 @@ package org.apache.storm.starter.trident; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.LocalDRPC; +import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.TopologyBuilder; @@ -182,17 +183,33 @@ public class TridentKafkaWordCount { * <p> * To run this topology ensure you have a kafka broker running. * </p> - * Create a topic test with command line, + * Create a topic 'test' with command line, + * <pre> * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test + * </pre> + * To run in local mode, + * <pre> + * storm jar storm-starter-topologies-{version}.jar org.apache.storm.starter.trident.TridentKafkaWordCount + * </pre> + * This will also run a local DRPC query and print the word counts. + * <p> + * To run in distributed mode, run it with a topology name. You will also need to start a drpc server and + * specify the drpc server details storm.yaml before submitting the topology. + * </p> + * <pre> + * storm jar storm-starter-topologies-{version}.jar org.apache.storm.starter.trident.TridentKafkaWordCount zkhost:port broker:port wordcount + * </pre> + * This will submit two topologies, one for the producer and another for the consumer. You can query the results + * (word counts) by running an external drpc query against the drpc server. */ public static void main(String[] args) throws Exception { String zkUrl = "localhost:2181"; // the defaults. String brokerUrl = "localhost:9092"; - if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) { - System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]"); - System.out.println(" E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]"); + if (args.length > 3 || (args.length == 1 && args[0].matches("^-h|--help$"))) { + System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url] [topology name]"); + System.out.println(" E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "] [wordcount]"); System.exit(1); } else if (args.length == 1) { zkUrl = args[0]; @@ -205,25 +222,35 @@ public class TridentKafkaWordCount { TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, brokerUrl); - LocalDRPC drpc = new LocalDRPC(); - LocalCluster cluster = new LocalCluster(); + if (args.length == 3) { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + conf.setNumWorkers(1); + // submit the consumer topology. + StormSubmitter.submitTopology(args[2] + "-consumer", conf, wordCount.buildConsumerTopology(null)); + // submit the producer topology. + StormSubmitter.submitTopology(args[2] + "-producer", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); + } else { + LocalDRPC drpc = new LocalDRPC(); + LocalCluster cluster = new LocalCluster(); - // submit the consumer topology. - cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc)); + // submit the consumer topology. + cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc)); - Config conf = new Config(); - conf.setMaxSpoutPending(20); - // submit the producer topology. - cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); + Config conf = new Config(); + conf.setMaxSpoutPending(20); + // submit the producer topology. + cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); - // keep querying the word counts for a minute. - for (int i = 0; i < 60; i++) { - System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); - Thread.sleep(1000); - } + // keep querying the word counts for a minute. + for (int i = 0; i < 60; i++) { + System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); + Thread.sleep(1000); + } - cluster.killTopology("kafkaBolt"); - cluster.killTopology("wordCounter"); - cluster.shutdown(); + cluster.killTopology("kafkaBolt"); + cluster.killTopology("wordCounter"); + cluster.shutdown(); + } } }