Github user Ethanlm commented on a diff in the pull request:
https://github.com/apache/storm/pull/2641#discussion_r183402775
--- Diff:
examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java
---
@@ -38,176 +47,66 @@
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.StringTokenizer;
-
public class BlobStoreAPIWordCountTopology {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
private static ClientBlobStore store; // Client API to invoke blob
store API functionality
private static String key = "key";
private static String fileName = "blacklist.txt";
- private static final Logger LOG =
LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
public static void prepare() {
Config conf = new Config();
conf.putAll(Utils.readStormConfig());
store = Utils.getClientBlobStore(conf);
}
- // Spout implementation
- public static class RandomSentenceSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
-
- @Override
- public void open(Map<String, Object> conf, TopologyContext
context, SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- _collector.emit(new Values(getRandomSentence()));
- }
-
- @Override
- public void ack(Object id) {
- }
-
- @Override
- public void fail(Object id) {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
-
- }
-
- // Bolt implementation
- public static class SplitSentence extends ShellBolt implements
IRichBolt {
-
- public SplitSentence() {
- super("python", "splitsentence.py");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
- public static class FilterWords extends BaseBasicBolt {
- boolean poll = false;
- long pollTime;
- Set<String> wordSet;
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- // Thread Polling every 5 seconds to update the wordSet
seconds which is
- // used in FilterWords bolt to filter the words
- try {
- if (!poll) {
- wordSet = parseFile(fileName);
- pollTime = System.currentTimeMillis();
- poll = true;
- } else {
- if ((System.currentTimeMillis() - pollTime) > 5000) {
- wordSet = parseFile(fileName);
- pollTime = System.currentTimeMillis();
- }
- }
- } catch (IOException exp) {
- throw new RuntimeException(exp);
- }
- if (wordSet !=null && !wordSet.contains(word)) {
- collector.emit(new Values(word));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
-
- public void buildAndLaunchWordCountTopology(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomSentenceSpout(), 5);
- builder.setBolt("split", new SplitSentence(),
8).shuffleGrouping("spout");
- builder.setBolt("filter", new FilterWords(),
6).shuffleGrouping("split");
-
- Config conf = new Config();
- conf.setDebug(true);
- try {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
- } catch (InvalidTopologyException | AuthorizationException |
AlreadyAliveException exp) {
- throw new RuntimeException(exp);
- }
- }
-
--- End diff --
Why are we moving locations of the functions
---