http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java deleted file mode 100644 index 250c418..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.blobstore.AtomicOutputStream; -import backtype.storm.blobstore.ClientBlobStore; -import backtype.storm.blobstore.InputStreamWithMeta; -import backtype.storm.blobstore.NimbusBlobStore; - -import backtype.storm.generated.AccessControl; -import backtype.storm.generated.AccessControlType; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.KeyAlreadyExistsException; -import backtype.storm.generated.KeyNotFoundException; -import backtype.storm.generated.SettableBlobMeta; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.ShellBolt; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.blobstore.BlobStoreAclHandler; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.StringTokenizer; - -public class BlobStoreAPIWordCountTopology { - private static ClientBlobStore store; // Client API to invoke blob store API functionality - private static String key = "key"; - private static String fileName = "blacklist.txt"; - private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); - - public static void prepare() { - Config conf = new Config(); - conf.putAll(Utils.readStormConfig()); - store = Utils.getClientBlobStore(conf); - } - - // Spout implementation - public static class RandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - } - - @Override - public void nextTuple() { - Utils.sleep(100); - _collector.emit(new Values(getRandomSentence())); - } - - @Override - public void ack(Object id) { - } - - @Override - public void fail(Object id) { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - - } - - // Bolt implementation - public static class SplitSentence extends ShellBolt implements IRichBolt { - - public SplitSentence() { - super("python", "splitsentence.py"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class FilterWords extends BaseBasicBolt { - boolean poll = false; - long pollTime; - Set<String> wordSet; - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - // Thread Polling every 5 seconds to update the wordSet seconds which is - // used in FilterWords bolt to filter the words - try { - if (!poll) { - wordSet = parseFile(fileName); - pollTime = System.currentTimeMillis(); - poll = true; - } else { - if ((System.currentTimeMillis() - pollTime) > 5000) { - wordSet = parseFile(fileName); - pollTime = System.currentTimeMillis(); - } - } - } catch (IOException exp) { - throw new RuntimeException(exp); - } - if (wordSet !=null && !wordSet.contains(word)) { - collector.emit(new Values(word)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - - public void buildAndLaunchWordCountTopology(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentenceSpout(), 5); - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split"); - - Config conf = new Config(); - conf.setDebug(true); - try { - conf.setNumWorkers(3); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) { - throw new RuntimeException(exp); - } - } - - // Equivalent create command on command line - // storm blobstore create --file blacklist.txt --acl o::rwa key - private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) - throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException { - String stringBlobACL = "o::rwa"; - AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL); - List<AccessControl> acls = new LinkedList<AccessControl>(); - acls.add(blobACL); // more ACLs can be added here - SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls); - AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta); - blobStream.write(readFile(file).toString().getBytes()); - blobStream.close(); - } - - // Equivalent update command on command line - // storm blobstore update --file blacklist.txt key - private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) - throws KeyNotFoundException, AuthorizationException, IOException { - AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey); - blobOutputStream.write(readFile(file).toString().getBytes()); - blobOutputStream.close(); - } - - private static String getRandomSentence() { - String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", - "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; - String sentence = sentences[new Random().nextInt(sentences.length)]; - return sentence; - } - - private static Set<String> getRandomWordSet() { - Set<String> randomWordSet = new HashSet<>(); - Random random = new Random(); - String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away", - "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" }; - // Choosing atmost 5 words to update the blacklist file for filtering - for (int i=0; i<5; i++) { - randomWordSet.add(words[random.nextInt(words.length)]); - } - return randomWordSet; - } - - private static Set<String> parseFile(String fileName) throws IOException { - File file = new File(fileName); - Set<String> wordSet = new HashSet<>(); - if (!file.exists()) { - return wordSet; - } - StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n"); - while (tokens.hasMoreElements()) { - wordSet.add(tokens.nextToken()); - } - LOG.debug("parseFile {}", wordSet); - return wordSet; - } - - private static StringBuilder readFile(File file) throws IOException { - String line; - StringBuilder fileContent = new StringBuilder(); - // Do not use canonical file name here as we are using - // symbolic links to read file data and performing atomic move - // while updating files - BufferedReader br = new BufferedReader(new FileReader(file)); - while ((line = br.readLine()) != null) { - fileContent.append(line); - fileContent.append(System.lineSeparator()); - } - return fileContent; - } - - // Creating a blacklist file to read from the disk - public static File createFile(String fileName) throws IOException { - File file = null; - file = new File(fileName); - if (!file.exists()) { - file.createNewFile(); - } - writeToFile(file, getRandomWordSet()); - return file; - } - - // Updating a blacklist file periodically with random words - public static File updateFile(File file) throws IOException { - writeToFile(file, getRandomWordSet()); - return file; - } - - // Writing random words to be blacklisted - public static void writeToFile(File file, Set<String> content) throws IOException{ - FileWriter fw = new FileWriter(file, false); - BufferedWriter bw = new BufferedWriter(fw); - Iterator<String> iter = content.iterator(); - while(iter.hasNext()) { - bw.write(iter.next()); - bw.write(System.lineSeparator()); - } - bw.close(); - } - - public static void main(String[] args) { - prepare(); - BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology(); - try { - File file = createFile(fileName); - // Creating blob again before launching topology - createBlobWithContent(key, store, file); - - // Blostore launch command with topology blobstore map - // Here we are giving it a local name so that we can read from the file - // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar - // storm.starter.BlobStoreAPIWordCountTopology bl -c - // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}' - wc.buildAndLaunchWordCountTopology(args); - - // Updating file few times every 5 seconds - for(int i=0; i<10; i++) { - updateBlobWithContent(key, store, updateFile(file)); - Utils.sleep(5000); - } - } catch (KeyAlreadyExistsException kae) { - LOG.info("Key already exists {}", kae); - } catch (AuthorizationException | KeyNotFoundException | IOException exp) { - throw new RuntimeException(exp); - } - } -} - -
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java deleted file mode 100644 index d7b1b3e..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.TestWordSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import java.util.Map; - -/** - * This is a basic example of a Storm topology. - */ -public class ExclamationTopology { - - public static class ExclamationBolt extends BaseRichBolt { - OutputCollector _collector; - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - _collector = collector; - } - - @Override - public void execute(Tuple tuple) { - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - - } - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("word", new TestWordSpout(), 10); - builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); - builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); - - Config conf = new Config(); - conf.setDebug(true); - - if (args != null && args.length > 0) { - conf.setNumWorkers(3); - - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } - else { - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(10000); - cluster.killTopology("test"); - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java deleted file mode 100644 index 8f78abd..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.*; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -/** - * WordCount but teh spout does not stop, and the bolts are implemented in - * java. This can show how fast the word count can run. - */ -public class FastWordCountTopology { - public static class FastRandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - Random _rand; - private static final String[] CHOICES = { - "marry had a little lamb whos fleese was white as snow", - "and every where that marry went the lamb was sure to go", - "one two three four five six seven eight nine ten", - "this is a test of the emergency broadcast system this is only a test", - "peter piper picked a peck of pickeled peppers" - }; - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = ThreadLocalRandom.current(); - } - - @Override - public void nextTuple() { - String sentence = CHOICES[_rand.nextInt(CHOICES.length)]; - _collector.emit(new Values(sentence), sentence); - } - - @Override - public void ack(Object id) { - //Ignored - } - - @Override - public void fail(Object id) { - _collector.emit(new Values(id), id); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - } - - public static class SplitSentence extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String sentence = tuple.getString(0); - for (String word: sentence.split("\\s+")) { - collector.emit(new Values(word, 1)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static void printMetrics(Nimbus.Client client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts: summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named "+name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - double weightedAvgTotal = 0.0; - for (ExecutorSummary exec: info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map<String, Long> failedMap = stats.get_failed().get(":all-time"); - Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); - Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); - for (String key: ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; - } - } - long ackVal = ackedMap.get(key); - double latVal = avgLatMap.get(key) * ackVal; - acked += ackVal; - weightedAvgTotal += latVal; - } - } - } - double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); - } - - public static void kill(Nimbus.Client client, String name) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(name, opts); - } - - public static void main(String[] args) throws Exception { - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new FastRandomSentenceSpout(), 4); - - builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); - - String name = "wc-test"; - if (args != null && args.length > 0) { - name = args[0]; - } - - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); - - Map clusterConf = Utils.readStormConfig(); - clusterConf.putAll(Utils.readCommandLineOpts()); - Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - - //Sleep for 5 mins - for (int i = 0; i < 10; i++) { - Thread.sleep(30 * 1000); - printMetrics(client, name); - } - kill(client, name); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java deleted file mode 100644 index 5df0688..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.*; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.FailedException; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -public class InOrderDeliveryTest { - public static class InOrderSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - int _base = 0; - int _i = 0; - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _base = context.getThisTaskIndex(); - } - - @Override - public void nextTuple() { - Values v = new Values(_base, _i); - _collector.emit(v, "ACK"); - _i++; - } - - @Override - public void ack(Object id) { - //Ignored - } - - @Override - public void fail(Object id) { - //Ignored - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("c1", "c2")); - } - } - - public static class Check extends BaseBasicBolt { - Map<Integer, Integer> expected = new HashMap<Integer, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - Integer c1 = tuple.getInteger(0); - Integer c2 = tuple.getInteger(1); - Integer exp = expected.get(c1); - if (exp == null) exp = 0; - if (c2.intValue() != exp.intValue()) { - System.out.println(c1+" "+c2+" != "+exp); - throw new FailedException(c1+" "+c2+" != "+exp); - } - exp = c2 + 1; - expected.put(c1, exp); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - //Empty - } - } - - public static void printMetrics(Nimbus.Client client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts: summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named "+name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - double weightedAvgTotal = 0.0; - for (ExecutorSummary exec: info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map<String, Long> failedMap = stats.get_failed().get(":all-time"); - Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); - Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); - for (String key: ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; - } - } - long ackVal = ackedMap.get(key); - double latVal = avgLatMap.get(key) * ackVal; - acked += ackVal; - weightedAvgTotal += latVal; - } - } - } - double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); - } - - public static void kill(Nimbus.Client client, String name) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(name, opts); - } - - public static void main(String[] args) throws Exception { - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new InOrderSpout(), 8); - builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1")); - - Config conf = new Config(); - conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); - - String name = "in-order-test"; - if (args != null && args.length > 0) { - name = args[0]; - } - - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); - - Map clusterConf = Utils.readStormConfig(); - clusterConf.putAll(Utils.readCommandLineOpts()); - Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - - //Sleep for 50 mins - for (int i = 0; i < 50; i++) { - Thread.sleep(30 * 1000); - printMetrics(client, name); - } - kill(client, name); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java deleted file mode 100644 index fe0bae2..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.drpc.DRPCSpout; -import backtype.storm.drpc.ReturnResults; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - - -public class ManualDRPC { - public static class ExclamationBolt extends BaseBasicBolt { - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("result", "return-info")); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String arg = tuple.getString(0); - Object retInfo = tuple.getValue(1); - collector.emit(new Values(arg + "!!!", retInfo)); - } - - } - - public static void main(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); - LocalDRPC drpc = new LocalDRPC(); - - DRPCSpout spout = new DRPCSpout("exclamation", drpc); - builder.setSpout("drpc", spout); - builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc"); - builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim"); - - LocalCluster cluster = new LocalCluster(); - Config conf = new Config(); - cluster.submitTopology("exclaim", conf, builder.createTopology()); - - System.out.println(drpc.execute("exclamation", "aaa")); - System.out.println(drpc.execute("exclamation", "bbb")); - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java deleted file mode 100644 index 4285ff9..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.TestWordSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -/** - * This is a basic example of a Storm topology. - */ -public class MultipleLoggerTopology { - public static class ExclamationLoggingBolt extends BaseRichBolt { - OutputCollector _collector; - Logger _rootLogger = LoggerFactory.getLogger (Logger.ROOT_LOGGER_NAME); - // ensure the loggers are configured in the worker.xml before - // trying to use them here - Logger _logger = LoggerFactory.getLogger ("com.myapp"); - Logger _subLogger = LoggerFactory.getLogger ("com.myapp.sub"); - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - _collector = collector; - } - - @Override - public void execute(Tuple tuple) { - _rootLogger.debug ("root: This is a DEBUG message"); - _rootLogger.info ("root: This is an INFO message"); - _rootLogger.warn ("root: This is a WARN message"); - _rootLogger.error ("root: This is an ERROR message"); - - _logger.debug ("myapp: This is a DEBUG message"); - _logger.info ("myapp: This is an INFO message"); - _logger.warn ("myapp: This is a WARN message"); - _logger.error ("myapp: This is an ERROR message"); - - _subLogger.debug ("myapp.sub: This is a DEBUG message"); - _subLogger.info ("myapp.sub: This is an INFO message"); - _subLogger.warn ("myapp.sub: This is a WARN message"); - _subLogger.error ("myapp.sub: This is an ERROR message"); - - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("word", new TestWordSpout(), 10); - builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word"); - builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1"); - - Config conf = new Config(); - conf.setDebug(true); - - if (args != null && args.length > 0) { - conf.setNumWorkers(2); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } else { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(10000); - cluster.killTopology("test"); - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java b/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java deleted file mode 100644 index 021cc17..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package storm.starter; - -import java.util.Arrays; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.utils.Utils; - -import storm.starter.bolt.PrinterBolt; -import storm.starter.spout.TwitterSampleSpout; - -public class PrintSampleStream { - public static void main(String[] args) { - String consumerKey = args[0]; - String consumerSecret = args[1]; - String accessToken = args[2]; - String accessTokenSecret = args[3]; - String[] arguments = args.clone(); - String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret, - accessToken, accessTokenSecret, keyWords)); - builder.setBolt("print", new PrinterBolt()) - .shuffleGrouping("twitter"); - - - Config conf = new Config(); - - - LocalCluster cluster = new LocalCluster(); - - cluster.submitTopology("test", conf, builder.createTopology()); - - Utils.sleep(10000); - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java deleted file mode 100644 index 73ed45a..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.StormSubmitter; -import backtype.storm.coordination.BatchOutputCollector; -import backtype.storm.drpc.LinearDRPCTopologyBuilder; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseBatchBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.*; - -/** - * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can - * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation. - * <p/> - * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people - * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the - * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower - * records. - * <p/> - * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes - * minutes on a single machine into one that takes just a couple seconds. - * <p/> - * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps. - * - * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a> - */ -public class ReachTopology { - public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ - put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); - put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); - put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); - }}; - - public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ - put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); - put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); - put("tim", Arrays.asList("alex")); - put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); - put("adam", Arrays.asList("david", "carissa")); - put("mike", Arrays.asList("john", "bob")); - put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); - }}; - - public static class GetTweeters extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - Object id = tuple.getValue(0); - String url = tuple.getString(1); - List<String> tweeters = TWEETERS_DB.get(url); - if (tweeters != null) { - for (String tweeter : tweeters) { - collector.emit(new Values(id, tweeter)); - } - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "tweeter")); - } - } - - public static class GetFollowers extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - Object id = tuple.getValue(0); - String tweeter = tuple.getString(1); - List<String> followers = FOLLOWERS_DB.get(tweeter); - if (followers != null) { - for (String follower : followers) { - collector.emit(new Values(id, follower)); - } - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "follower")); - } - } - - public static class PartialUniquer extends BaseBatchBolt { - BatchOutputCollector _collector; - Object _id; - Set<String> _followers = new HashSet<String>(); - - @Override - public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { - _collector = collector; - _id = id; - } - - @Override - public void execute(Tuple tuple) { - _followers.add(tuple.getString(1)); - } - - @Override - public void finishBatch() { - _collector.emit(new Values(_id, _followers.size())); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "partial-count")); - } - } - - public static class CountAggregator extends BaseBatchBolt { - BatchOutputCollector _collector; - Object _id; - int _count = 0; - - @Override - public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { - _collector = collector; - _id = id; - } - - @Override - public void execute(Tuple tuple) { - _count += tuple.getInteger(1); - } - - @Override - public void finishBatch() { - _collector.emit(new Values(_id, _count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "reach")); - } - } - - public static LinearDRPCTopologyBuilder construct() { - LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); - builder.addBolt(new GetTweeters(), 4); - builder.addBolt(new GetFollowers(), 12).shuffleGrouping(); - builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower")); - builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id")); - return builder; - } - - public static void main(String[] args) throws Exception { - LinearDRPCTopologyBuilder builder = construct(); - - - Config conf = new Config(); - - if (args == null || args.length == 0) { - conf.setMaxTaskParallelism(3); - LocalDRPC drpc = new LocalDRPC(); - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc)); - - String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" }; - for (String url : urlsToTry) { - System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); - } - - cluster.shutdown(); - drpc.shutdown(); - } - else { - conf.setNumWorkers(6); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology()); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java deleted file mode 100644 index 0fb3724..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.TestWordSpout; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.SpoutDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import java.util.Map; - -public class ResourceAwareExampleTopology { - public static class ExclamationBolt extends BaseRichBolt { - OutputCollector _collector; - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - _collector = collector; - } - - @Override - public void execute(Tuple tuple) { - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - - SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10); - //set cpu requirement - spout.setCPULoad(20); - //set onheap and offheap memory requirement - spout.setMemoryLoad(64, 16); - - BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); - //sets cpu requirement. Not neccessary to set both CPU and memory. - //For requirements not set, a default value will be used - bolt1.setCPULoad(15); - - BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); - bolt2.setMemoryLoad(100); - - Config conf = new Config(); - conf.setDebug(true); - - /** - * Use to limit the maximum amount of memory (in MB) allocated to one worker process. - * Can be used to spread executors to to multiple workers - */ - conf.setTopologyWorkerMaxHeapSize(1024.0); - - //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases). - //Recommended range of 0-29 but no hard limit set. - conf.setTopologyPriority(29); - - // Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy - conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class); - - if (args != null && args.length > 0) { - conf.setNumWorkers(3); - - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } - else { - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(10000); - cluster.killTopology("test"); - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java deleted file mode 100644 index 762c22a..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.testing.TestWordSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import org.apache.log4j.Logger; -import storm.starter.bolt.IntermediateRankingsBolt; -import storm.starter.bolt.RollingCountBolt; -import storm.starter.bolt.TotalRankingsBolt; -import storm.starter.util.StormRunner; - -/** - * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. - * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things - * like trending topics or trending images on Twitter. - */ -public class RollingTopWords { - - private static final Logger LOG = Logger.getLogger(RollingTopWords.class); - private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; - private static final int TOP_N = 5; - - private final TopologyBuilder builder; - private final String topologyName; - private final Config topologyConfig; - private final int runtimeInSeconds; - - public RollingTopWords(String topologyName) throws InterruptedException { - builder = new TopologyBuilder(); - this.topologyName = topologyName; - topologyConfig = createTopologyConfiguration(); - runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; - - wireTopology(); - } - - private static Config createTopologyConfiguration() { - Config conf = new Config(); - conf.setDebug(true); - return conf; - } - - private void wireTopology() throws InterruptedException { - String spoutId = "wordGenerator"; - String counterId = "counter"; - String intermediateRankerId = "intermediateRanker"; - String totalRankerId = "finalRanker"; - builder.setSpout(spoutId, new TestWordSpout(), 5); - builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); - builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields( - "obj")); - builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); - } - - public void runLocally() throws InterruptedException { - StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); - } - - public void runRemotely() throws Exception { - StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig); - } - - /** - * Submits (runs) the topology. - * - * Usage: "RollingTopWords [topology-name] [local|remote]" - * - * By default, the topology is run locally under the name "slidingWindowCounts". - * - * Examples: - * - * ``` - * - * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords - * - * # Runs in local mode (LocalCluster), with topology name "foobar" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar - * - * # Runs in local mode (LocalCluster), with topology name "foobar" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local - * - * # Runs in remote/cluster mode, with topology name "production-topology" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote - * ``` - * - * @param args First positional argument (optional) is topology name, second positional argument (optional) defines - * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote"). - * @throws Exception - */ - public static void main(String[] args) throws Exception { - String topologyName = "slidingWindowCounts"; - if (args.length >= 1) { - topologyName = args[0]; - } - boolean runLocally = true; - if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) { - runLocally = false; - } - - LOG.info("Topology name: " + topologyName); - RollingTopWords rtw = new RollingTopWords(topologyName); - if (runLocally) { - LOG.info("Running in local mode"); - rtw.runLocally(); - } - else { - LOG.info("Running in remote (cluster) mode"); - rtw.runRemotely(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java deleted file mode 100644 index cb1d98c..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.testing.FeederSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import storm.starter.bolt.SingleJoinBolt; - -public class SingleJoinExample { - public static void main(String[] args) { - FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); - FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); - - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("gender", genderSpout); - builder.setSpout("age", ageSpout); - builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) - .fieldsGrouping("age", new Fields("id")); - - Config conf = new Config(); - conf.setDebug(true); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("join-example", conf, builder.createTopology()); - - for (int i = 0; i < 10; i++) { - String gender; - if (i % 2 == 0) { - gender = "male"; - } - else { - gender = "female"; - } - genderSpout.feed(new Values(i, gender)); - } - - for (int i = 9; i >= 0; i--) { - ageSpout.feed(new Values(i, i + 20)); - } - - Utils.sleep(2000); - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java deleted file mode 100644 index 443c051..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.testing.TestWordSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import org.apache.log4j.Logger; -import storm.starter.bolt.IntermediateRankingsBolt; -import storm.starter.bolt.RollingCountBolt; -import storm.starter.bolt.RollingCountAggBolt; -import storm.starter.bolt.TotalRankingsBolt; -import storm.starter.util.StormRunner; - -/** - * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. - * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things - * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much - * more common then other words, and uses partialKeyGrouping to better balance the skewed load. - */ -public class SkewedRollingTopWords { - private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class); - private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; - private static final int TOP_N = 5; - - private final TopologyBuilder builder; - private final String topologyName; - private final Config topologyConfig; - private final int runtimeInSeconds; - - public SkewedRollingTopWords(String topologyName) throws InterruptedException { - builder = new TopologyBuilder(); - this.topologyName = topologyName; - topologyConfig = createTopologyConfiguration(); - runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; - - wireTopology(); - } - - private static Config createTopologyConfiguration() { - Config conf = new Config(); - conf.setDebug(true); - return conf; - } - - private void wireTopology() throws InterruptedException { - String spoutId = "wordGenerator"; - String counterId = "counter"; - String aggId = "aggregator"; - String intermediateRankerId = "intermediateRanker"; - String totalRankerId = "finalRanker"; - builder.setSpout(spoutId, new TestWordSpout(), 5); - builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word")); - builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj")); - builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj")); - builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); - } - - public void runLocally() throws InterruptedException { - StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); - } - - public void runRemotely() throws Exception { - StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig); - } - - /** - * Submits (runs) the topology. - * - * Usage: "RollingTopWords [topology-name] [local|remote]" - * - * By default, the topology is run locally under the name "slidingWindowCounts". - * - * Examples: - * - * ``` - * - * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords - * - * # Runs in local mode (LocalCluster), with topology name "foobar" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar - * - * # Runs in local mode (LocalCluster), with topology name "foobar" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local - * - * # Runs in remote/cluster mode, with topology name "production-topology" - * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote - * ``` - * - * @param args First positional argument (optional) is topology name, second positional argument (optional) defines - * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote"). - * @throws Exception - */ - public static void main(String[] args) throws Exception { - String topologyName = "slidingWindowCounts"; - if (args.length >= 1) { - topologyName = args[0]; - } - boolean runLocally = true; - if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) { - runLocally = false; - } - - LOG.info("Topology name: " + topologyName); - SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName); - if (runLocally) { - LOG.info("Running in local mode"); - rtw.runLocally(); - } - else { - LOG.info("Running in remote (cluster) mode"); - rtw.runRemotely(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java deleted file mode 100644 index 598335d..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseWindowedBolt; -import backtype.storm.utils.Utils; -import storm.starter.bolt.PrinterBolt; -import storm.starter.bolt.SlidingWindowSumBolt; -import storm.starter.spout.RandomIntegerSpout; - -import java.util.concurrent.TimeUnit; - -import static backtype.storm.topology.base.BaseWindowedBolt.Duration; - -/** - * Windowing based on tuple timestamp (e.g. the time when tuple is generated - * rather than when its processed). - */ -public class SlidingTupleTsTopology { - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - BaseWindowedBolt bolt = new SlidingWindowSumBolt() - .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)) - .withTimestampField("ts") - .withLag(new Duration(5, TimeUnit.SECONDS)); - builder.setSpout("integer", new RandomIntegerSpout(), 1); - builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer"); - builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum"); - Config conf = new Config(); - conf.setDebug(true); - - if (args != null && args.length > 0) { - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } else { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(40000); - cluster.killTopology("test"); - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java deleted file mode 100644 index 5031f8d..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseWindowedBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import backtype.storm.windowing.TupleWindow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.starter.bolt.PrinterBolt; -import storm.starter.bolt.SlidingWindowSumBolt; -import storm.starter.spout.RandomIntegerSpout; - -import java.util.List; -import java.util.Map; - -import static backtype.storm.topology.base.BaseWindowedBolt.Count; - -/** - * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt} - * to calculate sliding window sum. - */ -public class SlidingWindowTopology { - - private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class); - - /* - * Computes tumbling window average - */ - private static class TumblingWindowAvgBolt extends BaseWindowedBolt { - private OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(TupleWindow inputWindow) { - int sum = 0; - List<Tuple> tuplesInWindow = inputWindow.get(); - LOG.debug("Events in current window: " + tuplesInWindow.size()); - if (tuplesInWindow.size() > 0) { - /* - * Since this is a tumbling window calculation, - * we use all the tuples in the window to compute the avg. - */ - for (Tuple tuple : tuplesInWindow) { - sum += (int) tuple.getValue(0); - } - collector.emit(new Values(sum / tuplesInWindow.size())); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("avg")); - } - } - - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("integer", new RandomIntegerSpout(), 1); - builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1) - .shuffleGrouping("integer"); - builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1) - .shuffleGrouping("slidingsum"); - builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg"); - Config conf = new Config(); - conf.setDebug(true); - if (args != null && args.length > 0) { - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } else { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(40000); - cluster.killTopology("test"); - cluster.shutdown(); - } - } -}
