Fixing stylecheck problems with storm-starter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81ec15d1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81ec15d1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81ec15d1 Branch: refs/heads/master Commit: 81ec15d1096cd526b94313661e7b5de7ed1791d0 Parents: 7da98cf Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 22:36:19 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 00:22:36 2018 -0400 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 2 +- .../apache/storm/starter/AnchoredWordCount.java | 72 +- .../apache/storm/starter/BasicDRPCTopology.java | 49 +- .../starter/BlobStoreAPIWordCountTopology.java | 276 ++++---- .../storm/starter/ExclamationTopology.java | 84 ++- .../storm/starter/FastWordCountTopology.java | 300 ++++----- .../storm/starter/InOrderDeliveryTest.java | 261 ++++---- .../apache/storm/starter/JoinBoltExample.java | 39 +- .../apache/storm/starter/LambdaTopology.java | 24 +- .../org/apache/storm/starter/ManualDRPC.java | 51 +- .../storm/starter/MultipleLoggerTopology.java | 112 ++-- .../starter/PersistentWindowingTopology.java | 99 ++- .../org/apache/storm/starter/ReachTopology.java | 76 +-- .../starter/ResourceAwareExampleTopology.java | 113 ++-- .../apache/storm/starter/RollingTopWords.java | 111 ++- .../apache/storm/starter/SingleJoinExample.java | 26 +- .../storm/starter/SkewedRollingTopWords.java | 113 ++-- .../storm/starter/SlidingTupleTsTopology.java | 30 +- .../storm/starter/SlidingWindowTopology.java | 62 +- .../apache/storm/starter/StatefulTopology.java | 44 +- .../starter/StatefulWindowingTopology.java | 46 +- .../storm/starter/TransactionalGlobalCount.java | 65 +- .../storm/starter/TransactionalWords.java | 102 ++- .../apache/storm/starter/WordCountTopology.java | 115 ++-- .../storm/starter/WordCountTopologyNode.java | 61 +- .../storm/starter/bolt/AbstractRankerBolt.java | 135 ++-- .../starter/bolt/IntermediateRankingsBolt.java | 61 +- .../apache/storm/starter/bolt/PrinterBolt.java | 33 +- .../storm/starter/bolt/RollingCountAggBolt.java | 92 ++- .../storm/starter/bolt/RollingCountBolt.java | 169 +++-- .../storm/starter/bolt/SingleJoinBolt.java | 167 ++--- .../starter/bolt/SlidingWindowSumBolt.java | 48 +- .../storm/starter/bolt/TotalRankingsBolt.java | 63 +- .../storm/starter/spout/RandomIntegerSpout.java | 24 +- .../spout/RandomNumberGeneratorSpout.java | 35 +- .../starter/spout/RandomSentenceSpout.java | 120 ++-- .../storm/starter/streams/AggregateExample.java | 33 +- .../storm/starter/streams/BranchExample.java | 45 +- .../streams/GroupByKeyAndWindowExample.java | 62 +- .../storm/starter/streams/JoinExample.java | 48 +- .../starter/streams/StateQueryExample.java | 53 +- .../starter/streams/StatefulWordCount.java | 59 +- .../starter/streams/TypedTupleExample.java | 19 +- .../starter/streams/WindowedWordCount.java | 70 +- .../storm/starter/streams/WordCountToBolt.java | 51 +- .../tools/NthLastModifiedTimeTracker.java | 75 +-- .../apache/storm/starter/tools/Rankable.java | 35 +- .../starter/tools/RankableObjectWithFields.java | 224 +++---- .../apache/storm/starter/tools/Rankings.java | 238 ++++--- .../starter/tools/SlidingWindowCounter.java | 101 ++- .../storm/starter/tools/SlotBasedCounter.java | 144 ++-- .../starter/trident/DebugMemoryMapState.java | 7 +- .../TridentHBaseWindowingStoreTopology.java | 46 +- .../starter/trident/TridentMapExample.java | 50 +- .../trident/TridentMinMaxOfDevicesTopology.java | 85 ++- .../TridentMinMaxOfVehiclesTopology.java | 83 ++- .../storm/starter/trident/TridentReach.java | 95 ++- .../TridentWindowingInmemoryStoreTopology.java | 43 +- .../storm/starter/trident/TridentWordCount.java | 58 +- .../bolt/IntermediateRankingsBoltTest.java | 244 ++++--- .../starter/bolt/RollingCountBoltTest.java | 178 +++-- .../starter/bolt/TotalRankingsBoltTest.java | 246 ++++--- .../tools/NthLastModifiedTimeTrackerTest.java | 158 +++-- .../tools/RankableObjectWithFieldsTest.java | 489 +++++++------- .../storm/starter/tools/RankingsTest.java | 668 ++++++++++--------- .../starter/tools/SlidingWindowCounterTest.java | 175 +++-- .../starter/tools/SlotBasedCounterTest.java | 320 +++++---- 67 files changed, 3613 insertions(+), 3869 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index d12062d..569daf5 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -249,7 +249,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>1538</maxAllowedViolations> + <maxAllowedViolations>263</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java index cb45024..022f6ad 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.starter; @@ -22,7 +16,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.UUID; - import org.apache.storm.Config; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -39,6 +32,27 @@ import org.apache.storm.utils.Utils; public class AnchoredWordCount extends ConfigurableTopology { + protected int run(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setMaxTaskParallelism(3); + + String topologyName = "word-count"; + + conf.setNumWorkers(3); + + if (args != null && args.length > 0) { + topologyName = args[0]; + } + return submit(topologyName, conf, builder); + } + public static class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector collector; Random random; @@ -53,9 +67,11 @@ public class AnchoredWordCount extends ConfigurableTopology { @Override public void nextTuple() { Utils.sleep(10); - String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), - sentence("four score and seven years ago"), - sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; + String[] sentences = new String[]{ + sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), + sentence("four score and seven years ago"), + sentence("snow white and the seven dwarfs"), sentence("i am at two with nature") + }; final String sentence = sentences[random.nextInt(sentences.length)]; this.collector.emit(new Values(sentence), UUID.randomUUID()); @@ -79,12 +95,11 @@ public class AnchoredWordCount extends ConfigurableTopology { } } - public static class SplitSentence extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); - for (String word: sentence.split("\\s+")) { + for (String word : sentence.split("\\s+")) { collector.emit(new Values(word, 1)); } } @@ -115,25 +130,4 @@ public class AnchoredWordCount extends ConfigurableTopology { declarer.declare(new Fields("word", "count")); } } - - protected int run(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new RandomSentenceSpout(), 4); - - builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.setMaxTaskParallelism(3); - - String topologyName = "word-count"; - - conf.setNumWorkers(3); - - if (args != null && args.length > 0) { - topologyName = args[0]; - } - return submit(topologyName, conf, builder); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java index 325809c..83bad13 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import org.apache.storm.Config; @@ -35,19 +30,6 @@ import org.apache.storm.utils.DRPCClient; * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a> */ public class BasicDRPCTopology { - public static class ExclaimBolt extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String input = tuple.getString(1); - collector.emit(new Values(tuple.getValue(0), input + "!")); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "result")); - } - } - public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); @@ -56,12 +38,12 @@ public class BasicDRPCTopology { String topoName = "DRPCExample"; if (args != null && args.length > 0) { - topoName = args[0]; + topoName = args[0]; } conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createRemoteTopology()); - + if (args.length > 1) { try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) { for (int i = 1; i < args.length; i++) { @@ -71,4 +53,17 @@ public class BasicDRPCTopology { } } } + + public static class ExclaimBolt extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String input = tuple.getString(1); + collector.emit(new Values(tuple.getValue(0), input + "!")); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "result")); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java index 2fc28d2..caa751a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java @@ -1,27 +1,36 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.StringTokenizer; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.blobstore.AtomicOutputStream; +import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.blobstore.ClientBlobStore; - import org.apache.storm.generated.AccessControl; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; @@ -38,7 +47,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; @@ -46,26 +54,11 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.StringTokenizer; - public class BlobStoreAPIWordCountTopology { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); private static ClientBlobStore store; // Client API to invoke blob store API functionality private static String key = "key"; private static String fileName = "blacklist.txt"; - private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); public static void prepare() { Config conf = new Config(); @@ -73,114 +66,16 @@ public class BlobStoreAPIWordCountTopology { store = Utils.getClientBlobStore(conf); } - // Spout implementation - public static class RandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - } - - @Override - public void nextTuple() { - Utils.sleep(100); - _collector.emit(new Values(getRandomSentence())); - } - - @Override - public void ack(Object id) { - } - - @Override - public void fail(Object id) { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - - } - - // Bolt implementation - public static class SplitSentence extends ShellBolt implements IRichBolt { - - public SplitSentence() { - super("python", "splitsentence.py"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class FilterWords extends BaseBasicBolt { - boolean poll = false; - long pollTime; - Set<String> wordSet; - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - // Thread Polling every 5 seconds to update the wordSet seconds which is - // used in FilterWords bolt to filter the words - try { - if (!poll) { - wordSet = parseFile(fileName); - pollTime = System.currentTimeMillis(); - poll = true; - } else { - if ((System.currentTimeMillis() - pollTime) > 5000) { - wordSet = parseFile(fileName); - pollTime = System.currentTimeMillis(); - } - } - } catch (IOException exp) { - throw new RuntimeException(exp); - } - if (wordSet !=null && !wordSet.contains(word)) { - collector.emit(new Values(word)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - - public void buildAndLaunchWordCountTopology(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentenceSpout(), 5); - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split"); - - Config conf = new Config(); - conf.setDebug(true); - try { - conf.setNumWorkers(3); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) { - throw new RuntimeException(exp); - } - } - // Equivalent create command on command line // storm blobstore create --file blacklist.txt --acl o::rwa key private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) - throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException { + throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { String stringBlobACL = "o::rwa"; AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL); List<AccessControl> acls = new LinkedList<AccessControl>(); acls.add(blobACL); // more ACLs can be added here SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls); - AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta); + AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey, settableBlobMeta); blobStream.write(readFile(file).toString().getBytes()); blobStream.close(); } @@ -188,15 +83,17 @@ public class BlobStoreAPIWordCountTopology { // Equivalent update command on command line // storm blobstore update --file blacklist.txt key private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) - throws KeyNotFoundException, AuthorizationException, IOException { + throws KeyNotFoundException, AuthorizationException, IOException { AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey); blobOutputStream.write(readFile(file).toString().getBytes()); blobOutputStream.close(); } private static String getRandomSentence() { - String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", - "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; + String[] sentences = new String[]{ + "the cow jumped over the moon", "an apple a day keeps the doctor away", + "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" + }; String sentence = sentences[new Random().nextInt(sentences.length)]; return sentence; } @@ -204,10 +101,12 @@ public class BlobStoreAPIWordCountTopology { private static Set<String> getRandomWordSet() { Set<String> randomWordSet = new HashSet<>(); Random random = new Random(); - String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away", - "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" }; + String[] words = new String[]{ + "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away", + "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" + }; // Choosing atmost 5 words to update the blacklist file for filtering - for (int i=0; i<5; i++) { + for (int i = 0; i < 5; i++) { randomWordSet.add(words[random.nextInt(words.length)]); } return randomWordSet; @@ -259,11 +158,11 @@ public class BlobStoreAPIWordCountTopology { } // Writing random words to be blacklisted - public static void writeToFile(File file, Set<String> content) throws IOException{ + public static void writeToFile(File file, Set<String> content) throws IOException { FileWriter fw = new FileWriter(file, false); BufferedWriter bw = new BufferedWriter(fw); Iterator<String> iter = content.iterator(); - while(iter.hasNext()) { + while (iter.hasNext()) { bw.write(iter.next()); bw.write(System.lineSeparator()); } @@ -286,7 +185,7 @@ public class BlobStoreAPIWordCountTopology { wc.buildAndLaunchWordCountTopology(args); // Updating file few times every 5 seconds - for(int i=0; i<10; i++) { + for (int i = 0; i < 10; i++) { updateBlobWithContent(key, store, updateFile(file)); Utils.sleep(5000); } @@ -296,6 +195,105 @@ public class BlobStoreAPIWordCountTopology { throw new RuntimeException(exp); } } + + public void buildAndLaunchWordCountTopology(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new RandomSentenceSpout(), 5); + builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); + builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split"); + + Config conf = new Config(); + conf.setDebug(true); + try { + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) { + throw new RuntimeException(exp); + } + } + + // Spout implementation + public static class RandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + + @Override + public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + } + + @Override + public void nextTuple() { + Utils.sleep(100); + _collector.emit(new Values(getRandomSentence())); + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } + + } + + // Bolt implementation + public static class SplitSentence extends ShellBolt implements IRichBolt { + + public SplitSentence() { + super("python", "splitsentence.py"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + } + + public static class FilterWords extends BaseBasicBolt { + boolean poll = false; + long pollTime; + Set<String> wordSet; + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + // Thread Polling every 5 seconds to update the wordSet seconds which is + // used in FilterWords bolt to filter the words + try { + if (!poll) { + wordSet = parseFile(fileName); + pollTime = System.currentTimeMillis(); + poll = true; + } else { + if ((System.currentTimeMillis() - pollTime) > 5000) { + wordSet = parseFile(fileName); + pollTime = System.currentTimeMillis(); + } + } + } catch (IOException exp) { + throw new RuntimeException(exp); + } + if (wordSet != null && !wordSet.contains(word)) { + collector.emit(new Values(word)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java index a691201..73f2067 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.Map; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.testing.TestWordSpout; @@ -35,48 +29,48 @@ import org.apache.storm.tuple.Values; */ public class ExclamationTopology extends ConfigurableTopology { - public static class ExclamationBolt extends BaseRichBolt { - OutputCollector _collector; - - @Override - public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _collector = collector; + public static void main(String[] args) throws Exception { + ConfigurableTopology.start(new ExclamationTopology(), args); } - @Override - public void execute(Tuple tuple) { - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); - } + protected int run(String[] args) { + TopologyBuilder builder = new TopologyBuilder(); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } + builder.setSpout("word", new TestWordSpout(), 10); + builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); + builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); - } + conf.setDebug(true); - public static void main(String[] args) throws Exception { - ConfigurableTopology.start(new ExclamationTopology(), args); - } + String topologyName = "test"; - protected int run(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); + conf.setNumWorkers(3); - builder.setSpout("word", new TestWordSpout(), 10); - builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); - builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); + if (args != null && args.length > 0) { + topologyName = args[0]; + } - conf.setDebug(true); + return submit(topologyName, conf, builder); + } - String topologyName = "test"; + public static class ExclamationBolt extends BaseRichBolt { + OutputCollector _collector; - conf.setNumWorkers(3); + @Override + public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { + _collector = collector; + } - if (args != null && args.length > 0) { - topologyName = args[0]; - } + @Override + public void execute(Tuple tuple) { + _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); + _collector.ack(tuple); + } - return submit(topologyName, conf, builder); - } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java index e171557..f881a86 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java @@ -1,25 +1,30 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.*; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.SpoutStats; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; @@ -30,168 +35,165 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.NimbusClient; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; +import org.apache.storm.utils.Utils; /** * WordCount but the spout does not stop, and the bolts are implemented in * java. This can show how fast the word count can run. */ public class FastWordCountTopology { - public static class FastRandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - Random _rand; - private static final String[] CHOICES = { - "marry had a little lamb whos fleese was white as snow", - "and every where that marry went the lamb was sure to go", - "one two three four five six seven eight nine ten", - "this is a test of the emergency broadcast system this is only a test", - "peter piper picked a peck of pickeled peppers" - }; - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = ThreadLocalRandom.current(); + public static void printMetrics(Nimbus.Iface client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts : summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named " + name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec : info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map<String, Long> failedMap = stats.get_failed().get(":all-time"); + Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); + Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key : ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal / acked; + System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + + (((double) acked) / uptime + " failed: " + failed)); } - @Override - public void nextTuple() { - String sentence = CHOICES[_rand.nextInt(CHOICES.length)]; - _collector.emit(new Values(sentence), sentence); + public static void kill(Nimbus.Iface client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); } - @Override - public void ack(Object id) { - //Ignored - } + public static void main(String[] args) throws Exception { - @Override - public void fail(Object id) { - _collector.emit(new Values(id), id); - } + TopologyBuilder builder = new TopologyBuilder(); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - } - - public static class SplitSentence extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String sentence = tuple.getString(0); - for (String word: sentence.split("\\s+")) { - collector.emit(new Values(word, 1)); - } - } + builder.setSpout("spout", new FastRandomSentenceSpout(), 4); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static void printMetrics(Nimbus.Iface client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts: summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named "+name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - double weightedAvgTotal = 0.0; - for (ExecutorSummary exec: info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map<String, Long> failedMap = stats.get_failed().get(":all-time"); - Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); - Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); - for (String key: ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; - } - } - long ackVal = ackedMap.get(key); - double latVal = avgLatMap.get(key) * ackVal; - acked += ackVal; - weightedAvgTotal += latVal; + Config conf = new Config(); + conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class); + + String name = "wc-test"; + if (args != null && args.length > 0) { + name = args[0]; } - } - } - double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); - } - public static void kill(Nimbus.Iface client, String name) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(name, opts); - } + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); - public static void main(String[] args) throws Exception { + Map<String, Object> clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - TopologyBuilder builder = new TopologyBuilder(); + //Sleep for 5 mins + for (int i = 0; i < 10; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); + } + kill(client, name); + } + + public static class FastRandomSentenceSpout extends BaseRichSpout { + private static final String[] CHOICES = { + "marry had a little lamb whos fleese was white as snow", + "and every where that marry went the lamb was sure to go", + "one two three four five six seven eight nine ten", + "this is a test of the emergency broadcast system this is only a test", + "peter piper picked a peck of pickeled peppers" + }; + SpoutOutputCollector _collector; + Random _rand; + + @Override + public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = ThreadLocalRandom.current(); + } - builder.setSpout("spout", new FastRandomSentenceSpout(), 4); + @Override + public void nextTuple() { + String sentence = CHOICES[_rand.nextInt(CHOICES.length)]; + _collector.emit(new Values(sentence), sentence); + } - builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + @Override + public void ack(Object id) { + //Ignored + } - Config conf = new Config(); - conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class); + @Override + public void fail(Object id) { + _collector.emit(new Values(id), id); + } - String name = "wc-test"; - if (args != null && args.length > 0) { - name = args[0]; + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } } - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word : sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } - Map<String, Object> clusterConf = Utils.readStormConfig(); - clusterConf.putAll(Utils.readCommandLineOpts()); - Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } - //Sleep for 5 mins - for (int i = 0; i < 10; i++) { - Thread.sleep(30 * 1000); - printMetrics(client, name); + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } } - kill(client, name); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java index e4a41e2..5d314e0 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/InOrderDeliveryTest.java @@ -1,25 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; +import java.util.HashMap; +import java.util.Map; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.*; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.SpoutStats; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; @@ -31,143 +34,141 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.NimbusClient; - -import java.util.HashMap; -import java.util.Map; +import org.apache.storm.utils.Utils; public class InOrderDeliveryTest { - public static class InOrderSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - int _base = 0; - int _i = 0; - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _base = context.getThisTaskIndex(); + public static void printMetrics(Nimbus.Iface client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts : summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named " + name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec : info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map<String, Long> failedMap = stats.get_failed().get(":all-time"); + Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); + Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key : ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal / acked; + System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + + (((double) acked) / uptime + " failed: " + failed)); } - @Override - public void nextTuple() { - Values v = new Values(_base, _i); - _collector.emit(v, "ACK"); - _i++; + public static void kill(Nimbus.Iface client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); } - @Override - public void ack(Object id) { - //Ignored - } + public static void main(String[] args) throws Exception { - @Override - public void fail(Object id) { - //Ignored - } + TopologyBuilder builder = new TopologyBuilder(); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("c1", "c2")); - } - } - - public static class Check extends BaseBasicBolt { - Map<Integer, Integer> expected = new HashMap<Integer, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - Integer c1 = tuple.getInteger(0); - Integer c2 = tuple.getInteger(1); - Integer exp = expected.get(c1); - if (exp == null) exp = 0; - if (c2.intValue() != exp.intValue()) { - System.out.println(c1+" "+c2+" != "+exp); - throw new FailedException(c1+" "+c2+" != "+exp); - } - exp = c2 + 1; - expected.put(c1, exp); - } + builder.setSpout("spout", new InOrderSpout(), 8); + builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1")); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - //Empty - } - } - - public static void printMetrics(Nimbus.Iface client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts: summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named "+name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - double weightedAvgTotal = 0.0; - for (ExecutorSummary exec: info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map<String, Long> failedMap = stats.get_failed().get(":all-time"); - Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); - Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time"); - for (String key: ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; - } - } - long ackVal = ackedMap.get(key); - double latVal = avgLatMap.get(key) * ackVal; - acked += ackVal; - weightedAvgTotal += latVal; + Config conf = new Config(); + conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class); + + String name = "in-order-test"; + if (args != null && args.length > 0) { + name = args[0]; + } + + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); + + Map<String, Object> clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + //Sleep for 50 mins + for (int i = 0; i < 50; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); } - } + kill(client, name); } - double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); - } - public static void kill(Nimbus.Iface client, String name) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(name, opts); - } + public static class InOrderSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + int _base = 0; + int _i = 0; - public static void main(String[] args) throws Exception { + @Override + public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _base = context.getThisTaskIndex(); + } - TopologyBuilder builder = new TopologyBuilder(); + @Override + public void nextTuple() { + Values v = new Values(_base, _i); + _collector.emit(v, "ACK"); + _i++; + } - builder.setSpout("spout", new InOrderSpout(), 8); - builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1")); + @Override + public void ack(Object id) { + //Ignored + } - Config conf = new Config(); - conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class); + @Override + public void fail(Object id) { + //Ignored + } - String name = "in-order-test"; - if (args != null && args.length > 0) { - name = args[0]; + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("c1", "c2")); + } } - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); - - Map<String, Object> clusterConf = Utils.readStormConfig(); - clusterConf.putAll(Utils.readCommandLineOpts()); - Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + public static class Check extends BaseBasicBolt { + Map<Integer, Integer> expected = new HashMap<Integer, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + Integer c1 = tuple.getInteger(0); + Integer c2 = tuple.getInteger(1); + Integer exp = expected.get(c1); + if (exp == null) exp = 0; + if (c2.intValue() != exp.intValue()) { + System.out.println(c1 + " " + c2 + " != " + exp); + throw new FailedException(c1 + " " + c2 + " != " + exp); + } + exp = c2 + 1; + expected.put(c1, exp); + } - //Sleep for 50 mins - for (int i = 0; i < 50; i++) { - Thread.sleep(30 * 1000); - printMetrics(client, name); + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + //Empty + } } - kill(client, name); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java index b71b64a..6fc3739 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/JoinBoltExample.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.concurrent.TimeUnit; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.bolt.JoinBolt; @@ -33,8 +27,8 @@ import org.apache.storm.utils.NimbusClient; public class JoinBoltExample { public static void main(String[] args) throws Exception { if (!NimbusClient.isLocalOverride()) { - throw new IllegalStateException("This example only works in local mode. " - + "Run with storm local not storm jar"); + throw new IllegalStateException("This example only works in local mode. " + + "Run with storm local not storm jar"); } FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); @@ -45,15 +39,15 @@ public class JoinBoltExample { // inner join of 'age' and 'gender' records on 'id' field JoinBolt joiner = new JoinBolt("genderSpout", "id") - .join("ageSpout", "id", "genderSpout") - .select ("genderSpout:id,ageSpout:id,gender,age") - .withTumblingWindow( new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS) ); + .join("ageSpout", "id", "genderSpout") + .select("genderSpout:id,ageSpout:id,gender,age") + .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)); builder.setBolt("joiner", joiner) - .fieldsGrouping("genderSpout", new Fields("id")) - .fieldsGrouping("ageSpout", new Fields("id")) ; + .fieldsGrouping("genderSpout", new Fields("id")) + .fieldsGrouping("ageSpout", new Fields("id")); - builder.setBolt("printer", new PrinterBolt() ).shuffleGrouping("joiner"); + builder.setBolt("printer", new PrinterBolt()).shuffleGrouping("joiner"); Config conf = new Config(); StormSubmitter.submitTopologyWithProgressBar("join-example", conf, builder.createTopology()); @@ -74,8 +68,7 @@ public class JoinBoltExample { String gender; if (i % 2 == 0) { gender = "male"; - } - else { + } else { gender = "female"; } genderSpout.feed(new Values(i, gender)); http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index 61b02db..94b1c38 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -1,30 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; +import java.io.Serializable; +import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.topology.ConfigurableTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Values; -import java.io.Serializable; -import java.util.UUID; - public class LambdaTopology extends ConfigurableTopology { public static void main(String[] args) { ConfigurableTopology.start(new LambdaTopology(), args); http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java index 110d0be..2038399 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import org.apache.storm.Config; @@ -31,22 +26,6 @@ import org.apache.storm.tuple.Values; import org.apache.storm.utils.DRPCClient; public class ManualDRPC { - public static class ExclamationBolt extends BaseBasicBolt { - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("result", "return-info")); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String arg = tuple.getString(0); - Object retInfo = tuple.getValue(1); - collector.emit(new Values(arg + "!!!", retInfo)); - } - - } - public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); @@ -62,4 +41,20 @@ public class ManualDRPC { System.out.println(drpc.execute("exclamation", "bbb")); } } + + public static class ExclamationBolt extends BaseBasicBolt { + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("result", "return-info")); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String arg = tuple.getString(0); + Object retInfo = tuple.getValue(1); + collector.emit(new Values(arg + "!!!", retInfo)); + } + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java index aeedc78..954e195 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/MultipleLoggerTopology.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.task.OutputCollector; @@ -37,60 +31,60 @@ import org.slf4j.LoggerFactory; * This is a basic example of a Storm topology. */ public class MultipleLoggerTopology { - public static class ExclamationLoggingBolt extends BaseRichBolt { - OutputCollector _collector; - Logger _rootLogger = LoggerFactory.getLogger (Logger.ROOT_LOGGER_NAME); - // ensure the loggers are configured in the worker.xml before - // trying to use them here - Logger _logger = LoggerFactory.getLogger ("com.myapp"); - Logger _subLogger = LoggerFactory.getLogger ("com.myapp.sub"); + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); - @Override - public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _collector = collector; - } + builder.setSpout("word", new TestWordSpout(), 10); + builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word"); + builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1"); - @Override - public void execute(Tuple tuple) { - _rootLogger.debug ("root: This is a DEBUG message"); - _rootLogger.info ("root: This is an INFO message"); - _rootLogger.warn ("root: This is a WARN message"); - _rootLogger.error ("root: This is an ERROR message"); + Config conf = new Config(); + conf.setDebug(true); + String topoName = MultipleLoggerTopology.class.getName(); + if (args != null && args.length > 0) { + topoName = args[0]; + } + conf.setNumWorkers(2); + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); + } - _logger.debug ("myapp: This is a DEBUG message"); - _logger.info ("myapp: This is an INFO message"); - _logger.warn ("myapp: This is a WARN message"); - _logger.error ("myapp: This is an ERROR message"); + public static class ExclamationLoggingBolt extends BaseRichBolt { + OutputCollector _collector; + Logger _rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + // ensure the loggers are configured in the worker.xml before + // trying to use them here + Logger _logger = LoggerFactory.getLogger("com.myapp"); + Logger _subLogger = LoggerFactory.getLogger("com.myapp.sub"); - _subLogger.debug ("myapp.sub: This is a DEBUG message"); - _subLogger.info ("myapp.sub: This is an INFO message"); - _subLogger.warn ("myapp.sub: This is a WARN message"); - _subLogger.error ("myapp.sub: This is an ERROR message"); + @Override + public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { + _collector = collector; + } - _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); - _collector.ack(tuple); - } + @Override + public void execute(Tuple tuple) { + _rootLogger.debug("root: This is a DEBUG message"); + _rootLogger.info("root: This is an INFO message"); + _rootLogger.warn("root: This is a WARN message"); + _rootLogger.error("root: This is an ERROR message"); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } + _logger.debug("myapp: This is a DEBUG message"); + _logger.info("myapp: This is an INFO message"); + _logger.warn("myapp: This is a WARN message"); + _logger.error("myapp: This is an ERROR message"); - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); + _subLogger.debug("myapp.sub: This is a DEBUG message"); + _subLogger.info("myapp.sub: This is an INFO message"); + _subLogger.warn("myapp.sub: This is a WARN message"); + _subLogger.error("myapp.sub: This is an ERROR message"); - builder.setSpout("word", new TestWordSpout(), 10); - builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word"); - builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1"); + _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); + _collector.ack(tuple); + } - Config conf = new Config(); - conf.setDebug(true); - String topoName = MultipleLoggerTopology.class.getName(); - if (args != null && args.length > 0) { - topoName = args[0]; + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } } - conf.setNumWorkers(2); - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java index 566ab69..46f29a0 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java @@ -18,8 +18,6 @@ package org.apache.storm.starter; -import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; - import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -40,18 +38,18 @@ import org.apache.storm.windowing.TupleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; + /** - * An example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} with window - * persistence. + * An example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} with window persistence. * <p> - * The framework automatically checkpoints the tuples in the window along with the bolt's state and restores the same - * during restarts. + * The framework automatically checkpoints the tuples in the window along with the bolt's state and restores the same during restarts. * </p> * * <p> - * This topology uses 'redis' for state persistence, so you should also start a redis instance before deploying. - * If you are running in local mode you can just start a redis server locally which will be used for storing the state. The default - * RedisKeyValueStateProvider parameters can be overridden by setting {@link Config#TOPOLOGY_STATE_PROVIDER_CONFIG}, for e.g. + * This topology uses 'redis' for state persistence, so you should also start a redis instance before deploying. If you are running in local + * mode you can just start a redis server locally which will be used for storing the state. The default RedisKeyValueStateProvider + * parameters can be overridden by setting {@link Config#TOPOLOGY_STATE_PROVIDER_CONFIG}, for e.g. * <pre> * { * "jedisPoolConfig": { @@ -68,6 +66,47 @@ import org.slf4j.LoggerFactory; public class PersistentWindowingTopology { private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowingTopology.class); + /** + * Create and deploy the topology. + * + * @param args args + * @throws Exception exception + */ + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + // generate random numbers + builder.setSpout("spout", new RandomIntegerSpout()); + + // emits sliding window and global averages + builder.setBolt("avgbolt", new AvgBolt() + .withWindow(new Duration(10, TimeUnit.SECONDS), new Duration(2, TimeUnit.SECONDS)) + // persist the window in state + .withPersistence() + // max number of events to be cached in memory + .withMaxEventsInMemory(25000), 1) + .shuffleGrouping("spout"); + + // print the values to stdout + builder.setBolt("printer", (x, y) -> System.out.println(x.getValue(0)), 1).shuffleGrouping("avgbolt"); + + Config conf = new Config(); + conf.setDebug(false); + + // checkpoint the state every 5 seconds + conf.put(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL, 5000); + + // use redis for state persistence + conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider"); + + String topoName = "test"; + if (args != null && args.length > 0) { + topoName = args[0]; + } + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); + } + // wrapper to hold global and window averages private static class Averages { private final double global; @@ -131,46 +170,4 @@ public class PersistentWindowingTopology { } } - - /** - * Create and deploy the topology. - * - * @param args args - * @throws Exception exception - */ - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - - // generate random numbers - builder.setSpout("spout", new RandomIntegerSpout()); - - // emits sliding window and global averages - builder.setBolt("avgbolt", new AvgBolt() - .withWindow(new Duration(10, TimeUnit.SECONDS), new Duration(2, TimeUnit.SECONDS)) - // persist the window in state - .withPersistence() - // max number of events to be cached in memory - .withMaxEventsInMemory(25000), 1) - .shuffleGrouping("spout"); - - // print the values to stdout - builder.setBolt("printer", (x, y) -> System.out.println(x.getValue(0)), 1).shuffleGrouping("avgbolt"); - - Config conf = new Config(); - conf.setDebug(false); - - // checkpoint the state every 5 seconds - conf.put(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL, 5000); - - // use redis for state persistence - conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider"); - - String topoName = "test"; - if (args != null && args.length > 0) { - topoName = args[0]; - } - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); - } - }
