http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java new file mode 100644 index 0000000..730f156 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountBolt.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; +import org.apache.log4j.Logger; +import org.apache.storm.starter.tools.NthLastModifiedTimeTracker; +import org.apache.storm.starter.tools.SlidingWindowCounter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting. + * <p/> + * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output + * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the + * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five + * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every + * minute. + * <p/> + * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the + * actual duration of the sliding window. The latter is included in case the expected sliding window length (as + * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual + * window length is tracked and calculated for the window, and not individually for each object within a window. + * <p/> + * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window + * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window + * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning + * during the first ~ five minutes of startup time if the window length is set to five minutes). + */ +public class RollingCountBolt extends BaseRichBolt { + + private static final long serialVersionUID = 5537727428628598519L; + private static final Logger LOG = Logger.getLogger(RollingCountBolt.class); + private static final int NUM_WINDOW_CHUNKS = 5; + private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; + private static final String WINDOW_LENGTH_WARNING_TEMPLATE = + "Actual window length is %d seconds when it should be %d seconds" + + " (you can safely ignore this warning during the startup phase)"; + + private final SlidingWindowCounter<Object> counter; + private final int windowLengthInSeconds; + private final int emitFrequencyInSeconds; + private OutputCollector collector; + private NthLastModifiedTimeTracker lastModifiedTracker; + + public RollingCountBolt() { + this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { + this.windowLengthInSeconds = windowLengthInSeconds; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, + this.emitFrequencyInSeconds)); + } + + private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { + return windowLengthInSeconds / windowUpdateFrequencyInSeconds; + } + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, + this.emitFrequencyInSeconds)); + } + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + LOG.debug("Received tick tuple, triggering emit of current window counts"); + emitCurrentWindowCounts(); + } + else { + countObjAndAck(tuple); + } + } + + private void emitCurrentWindowCounts() { + Map<Object, Long> counts = counter.getCountsThenAdvanceWindow(); + int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); + lastModifiedTracker.markAsModified(); + if (actualWindowLengthInSeconds != windowLengthInSeconds) { + LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); + } + emit(counts, actualWindowLengthInSeconds); + } + + private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) { + for (Entry<Object, Long> entry : counts.entrySet()) { + Object obj = entry.getKey(); + Long count = entry.getValue(); + collector.emit(new Values(obj, count, actualWindowLengthInSeconds)); + } + } + + private void countObjAndAck(Tuple tuple) { + Object obj = tuple.getValue(0); + counter.incrementCount(obj); + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java new file mode 100644 index 0000000..163c0f2 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SingleJoinBolt.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TimeCacheMap; + +import java.util.*; + +public class SingleJoinBolt extends BaseRichBolt { + OutputCollector _collector; + Fields _idFields; + Fields _outFields; + int _numSources; + TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending; + Map<String, GlobalStreamId> _fieldLocations; + + public SingleJoinBolt(Fields outFields) { + _outFields = outFields; + } + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + _fieldLocations = new HashMap<String, GlobalStreamId>(); + _collector = collector; + int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); + _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); + _numSources = context.getThisSources().size(); + Set<String> idFields = null; + for (GlobalStreamId source : context.getThisSources().keySet()) { + Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); + Set<String> setFields = new HashSet<String>(fields.toList()); + if (idFields == null) + idFields = setFields; + else + idFields.retainAll(setFields); + + for (String outfield : _outFields) { + for (String sourcefield : fields) { + if (outfield.equals(sourcefield)) { + _fieldLocations.put(outfield, source); + } + } + } + } + _idFields = new Fields(new ArrayList<String>(idFields)); + + if (_fieldLocations.size() != _outFields.size()) { + throw new RuntimeException("Cannot find all outfields among sources"); + } + } + + @Override + public void execute(Tuple tuple) { + List<Object> id = tuple.select(_idFields); + GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); + if (!_pending.containsKey(id)) { + _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); + } + Map<GlobalStreamId, Tuple> parts = _pending.get(id); + if (parts.containsKey(streamId)) + throw new RuntimeException("Received same side of single join twice"); + parts.put(streamId, tuple); + if (parts.size() == _numSources) { + _pending.remove(id); + List<Object> joinResult = new ArrayList<Object>(); + for (String outField : _outFields) { + GlobalStreamId loc = _fieldLocations.get(outField); + joinResult.add(parts.get(loc).getValueByField(outField)); + } + _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); + + for (Tuple part : parts.values()) { + _collector.ack(part); + } + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(_outFields); + } + + private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { + @Override + public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { + for (Tuple tuple : tuples.values()) { + _collector.fail(tuple); + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java new file mode 100644 index 0000000..cd58380 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/SlidingWindowSumBolt.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Computes sliding window sum + */ +public class SlidingWindowSumBolt extends BaseWindowedBolt { + private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class); + + private int sum = 0; + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + /* + * The inputWindow gives a view of + * (a) all the events in the window + * (b) events that expired since last activation of the window + * (c) events that newly arrived since last activation of the window + */ + List<Tuple> tuplesInWindow = inputWindow.get(); + List<Tuple> newTuples = inputWindow.getNew(); + List<Tuple> expiredTuples = inputWindow.getExpired(); + + LOG.debug("Events in current window: " + tuplesInWindow.size()); + /* + * Instead of iterating over all the tuples in the window to compute + * the sum, the values for the new events are added and old events are + * subtracted. Similar optimizations might be possible in other + * windowing computations. + */ + for (Tuple tuple : newTuples) { + sum += (int) tuple.getValue(0); + } + for (Tuple tuple : expiredTuples) { + sum -= (int) tuple.getValue(0); + } + collector.emit(new Values(sum)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sum")); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java new file mode 100644 index 0000000..bfed34e --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/TotalRankingsBolt.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.tuple.Tuple; +import org.apache.log4j.Logger; +import org.apache.storm.starter.tools.Rankings; + +/** + * This bolt merges incoming {@link Rankings}. + * <p/> + * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final, + * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}. + */ +public final class TotalRankingsBolt extends AbstractRankerBolt { + + private static final long serialVersionUID = -8447525895532302198L; + private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class); + + public TotalRankingsBolt() { + super(); + } + + public TotalRankingsBolt(int topN) { + super(topN); + } + + public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) { + super(topN, emitFrequencyInSeconds); + } + + @Override + void updateRankingsWithTuple(Tuple tuple) { + Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); + super.getRankings().updateWith(rankingsToBeMerged); + super.getRankings().pruneZeroCounts(); + } + + @Override + Logger getLogger() { + return LOG; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java new file mode 100644 index 0000000..e81ca40 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.Map; +import java.util.Random; + +/** + * Emits a random integer and a timestamp value (offset by one day), + * every 100 ms. The ts field can be used in tuple time based windowing. + */ +public class RandomIntegerSpout extends BaseRichSpout { + private SpoutOutputCollector collector; + private Random rand; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value", "ts")); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + this.rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(100); + collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000))); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java new file mode 100644 index 0000000..49bec2e --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.Map; +import java.util.Random; + +public class RandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + Random _rand; + + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(100); + String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", + "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; + String sentence = sentences[_rand.nextInt(sentences.length)]; + _collector.emit(new Values(sentence)); + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java new file mode 100644 index 0000000..df26d25 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/TwitterSampleSpout.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.starter.spout; + +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import twitter4j.FilterQuery; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterStream; +import twitter4j.TwitterStreamFactory; +import twitter4j.auth.AccessToken; +import twitter4j.conf.ConfigurationBuilder; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +@SuppressWarnings("serial") +public class TwitterSampleSpout extends BaseRichSpout { + + SpoutOutputCollector _collector; + LinkedBlockingQueue<Status> queue = null; + TwitterStream _twitterStream; + String consumerKey; + String consumerSecret; + String accessToken; + String accessTokenSecret; + String[] keyWords; + + public TwitterSampleSpout(String consumerKey, String consumerSecret, + String accessToken, String accessTokenSecret, String[] keyWords) { + this.consumerKey = consumerKey; + this.consumerSecret = consumerSecret; + this.accessToken = accessToken; + this.accessTokenSecret = accessTokenSecret; + this.keyWords = keyWords; + } + + public TwitterSampleSpout() { + // TODO Auto-generated constructor stub + } + + @Override + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + queue = new LinkedBlockingQueue<Status>(1000); + _collector = collector; + + StatusListener listener = new StatusListener() { + + @Override + public void onStatus(Status status) { + + queue.offer(status); + } + + @Override + public void onDeletionNotice(StatusDeletionNotice sdn) { + } + + @Override + public void onTrackLimitationNotice(int i) { + } + + @Override + public void onScrubGeo(long l, long l1) { + } + + @Override + public void onException(Exception ex) { + } + + @Override + public void onStallWarning(StallWarning arg0) { + // TODO Auto-generated method stub + + } + + }; + + TwitterStream twitterStream = new TwitterStreamFactory( + new ConfigurationBuilder().setJSONStoreEnabled(true).build()) + .getInstance(); + + twitterStream.addListener(listener); + twitterStream.setOAuthConsumer(consumerKey, consumerSecret); + AccessToken token = new AccessToken(accessToken, accessTokenSecret); + twitterStream.setOAuthAccessToken(token); + + if (keyWords.length == 0) { + + twitterStream.sample(); + } + + else { + + FilterQuery query = new FilterQuery().track(keyWords); + twitterStream.filter(query); + } + + } + + @Override + public void nextTuple() { + Status ret = queue.poll(); + if (ret == null) { + Utils.sleep(50); + } else { + _collector.emit(new Values(ret)); + + } + } + + @Override + public void close() { + _twitterStream.shutdown(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Config ret = new Config(); + ret.setMaxTaskParallelism(1); + return ret; + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("tweet")); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java new file mode 100644 index 0000000..faa4e32 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import org.apache.storm.utils.Time; +import org.apache.commons.collections.buffer.CircularFifoBuffer; + +/** + * This class tracks the time-since-last-modify of a "thing" in a rolling fashion. + * <p/> + * For example, create a 5-slot tracker to track the five most recent time-since-last-modify. + * <p/> + * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just + * been modified. + */ +public class NthLastModifiedTimeTracker { + + private static final int MILLIS_IN_SEC = 1000; + + private final CircularFifoBuffer lastModifiedTimesMillis; + + public NthLastModifiedTimeTracker(int numTimesToTrack) { + if (numTimesToTrack < 1) { + throw new IllegalArgumentException( + "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); + } + lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); + initLastModifiedTimesMillis(); + } + + private void initLastModifiedTimesMillis() { + long nowCached = now(); + for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { + lastModifiedTimesMillis.add(Long.valueOf(nowCached)); + } + } + + private long now() { + return Time.currentTimeMillis(); + } + + public int secondsSinceOldestModification() { + long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue(); + return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); + } + + public void markAsModified() { + updateLastModifiedTime(); + } + + private void updateLastModifiedTime() { + lastModifiedTimesMillis.add(now()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java new file mode 100644 index 0000000..85f2b62 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +public interface Rankable extends Comparable<Rankable> { + + Object getObject(); + + long getCount(); + + /** + * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is. + * + * @return a defensive copy + */ + Rankable copy(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java new file mode 100644 index 0000000..b1a9dca --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import org.apache.storm.tuple.Tuple; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.io.Serializable; +import java.util.List; + +/** + * This class wraps an objects and its associated count, including any additional data fields. + * <p/> + * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology. + */ +public class RankableObjectWithFields implements Rankable, Serializable { + + private static final long serialVersionUID = -9102878650001058090L; + private static final String toStringSeparator = "|"; + + private final Object obj; + private final long count; + private final ImmutableList<Object> fields; + + public RankableObjectWithFields(Object obj, long count, Object... otherFields) { + if (obj == null) { + throw new IllegalArgumentException("The object must not be null"); + } + if (count < 0) { + throw new IllegalArgumentException("The count must be >= 0"); + } + this.obj = obj; + this.count = count; + fields = ImmutableList.copyOf(otherFields); + + } + + /** + * Construct a new instance based on the provided {@link Tuple}. + * <p/> + * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of + * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be + * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}. + * + * @param tuple + * + * @return new instance based on the provided tuple + */ + public static RankableObjectWithFields from(Tuple tuple) { + List<Object> otherFields = Lists.newArrayList(tuple.getValues()); + Object obj = otherFields.remove(0); + Long count = (Long) otherFields.remove(0); + return new RankableObjectWithFields(obj, count, otherFields.toArray()); + } + + public Object getObject() { + return obj; + } + + public long getCount() { + return count; + } + + /** + * @return an immutable list of any additional data fields of the object (may be empty but will never be null) + */ + public List<Object> getFields() { + return fields; + } + + @Override + public int compareTo(Rankable other) { + long delta = this.getCount() - other.getCount(); + if (delta > 0) { + return 1; + } + else if (delta < 0) { + return -1; + } + else { + return 0; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RankableObjectWithFields)) { + return false; + } + RankableObjectWithFields other = (RankableObjectWithFields) o; + return obj.equals(other.obj) && count == other.count; + } + + @Override + public int hashCode() { + int result = 17; + int countHash = (int) (count ^ (count >>> 32)); + result = 31 * result + countHash; + result = 31 * result + obj.hashCode(); + return result; + } + + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("["); + buf.append(obj); + buf.append(toStringSeparator); + buf.append(count); + for (Object field : fields) { + buf.append(toStringSeparator); + buf.append(field); + } + buf.append("]"); + return buf.toString(); + } + + /** + * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, + * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. + * + * @return + */ + @Override + public Rankable copy() { + List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); + return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java new file mode 100644 index 0000000..17174b3 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +public class Rankings implements Serializable { + + private static final long serialVersionUID = -1549827195410578903L; + private static final int DEFAULT_COUNT = 10; + + private final int maxSize; + private final List<Rankable> rankedItems = Lists.newArrayList(); + + public Rankings() { + this(DEFAULT_COUNT); + } + + public Rankings(int topN) { + if (topN < 1) { + throw new IllegalArgumentException("topN must be >= 1"); + } + maxSize = topN; + } + + /** + * Copy constructor. + * @param other + */ + public Rankings(Rankings other) { + this(other.maxSize()); + updateWith(other); + } + + /** + * @return the maximum possible number (size) of ranked objects this instance can hold + */ + public int maxSize() { + return maxSize; + } + + /** + * @return the number (size) of ranked objects this instance is currently holding + */ + public int size() { + return rankedItems.size(); + } + + /** + * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the + * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the + * contract of {@link org.apache.storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within + * a Rankable will be defensively copied, too. + * + * @return a somewhat defensive copy of ranked items + */ + public List<Rankable> getRankings() { + List<Rankable> copy = Lists.newLinkedList(); + for (Rankable r: rankedItems) { + copy.add(r.copy()); + } + return ImmutableList.copyOf(copy); + } + + public void updateWith(Rankings other) { + for (Rankable r : other.getRankings()) { + updateWith(r); + } + } + + public void updateWith(Rankable r) { + synchronized(rankedItems) { + addOrReplace(r); + rerank(); + shrinkRankingsIfNeeded(); + } + } + + private void addOrReplace(Rankable r) { + Integer rank = findRankOf(r); + if (rank != null) { + rankedItems.set(rank, r); + } + else { + rankedItems.add(r); + } + } + + private Integer findRankOf(Rankable r) { + Object tag = r.getObject(); + for (int rank = 0; rank < rankedItems.size(); rank++) { + Object cur = rankedItems.get(rank).getObject(); + if (cur.equals(tag)) { + return rank; + } + } + return null; + } + + private void rerank() { + Collections.sort(rankedItems); + Collections.reverse(rankedItems); + } + + private void shrinkRankingsIfNeeded() { + if (rankedItems.size() > maxSize) { + rankedItems.remove(maxSize); + } + } + + /** + * Removes ranking entries that have a count of zero. + */ + public void pruneZeroCounts() { + int i = 0; + while (i < rankedItems.size()) { + if (rankedItems.get(i).getCount() == 0) { + rankedItems.remove(i); + } + else { + i++; + } + } + } + + public String toString() { + return rankedItems.toString(); + } + + /** + * Creates a (defensive) copy of itself. + */ + public Rankings copy() { + return new Rankings(this); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java new file mode 100644 index 0000000..b95a6a9 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import java.io.Serializable; +import java.util.Map; + +/** + * This class counts objects in a sliding window fashion. + * <p/> + * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment + * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access + * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the + * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads + * will go to. Also, by itself this class will not advance the head slot. + * <p/> + * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code> + * iterations, this sliding window counter will always return object counts that are equal or greater than in the + * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually, + * this is the desired behavior. + * <p/> + * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each: + * <p/> + * <pre> + * {@code + * Sliding window counts of an object X over time + * + * Minute (timeline): + * 1 2 3 4 5 6 7 8 + * + * Observed counts per minute: + * 1 1 1 1 0 0 0 0 + * + * Counts returned by counter: + * 1 2 3 4 4 3 2 1 + * } + * </pre> + * <p/> + * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the + * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load + * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your + * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously + * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing + * counts. + * <p/> + * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes, + * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times + * in the past five minutes", implying that it can only account for the last two of those five minutes because the + * counter was not running before that time. + * + * @param <T> The type of those objects we want to count. + */ +public final class SlidingWindowCounter<T> implements Serializable { + + private static final long serialVersionUID = -2645063988768785810L; + + private SlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; + + public SlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException( + "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); + + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } + + public void incrementCount(T obj) { + objCounter.incrementCount(obj, headSlot); + } + + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p/> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return The current (total) counts of all tracked objects. + */ + public Map<T, Long> getCountsThenAdvanceWindow() { + Map<T, Long> counts = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return counts; + } + + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } + + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java new file mode 100644 index 0000000..b8ca15b --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * This class provides per-slot counts of the occurrences of objects. + * <p/> + * It can be used, for instance, as a building block for implementing sliding window counting of objects. + * + * @param <T> The type of those objects we want to count. + */ +public final class SlotBasedCounter<T> implements Serializable { + + private static final long serialVersionUID = 4858185737378394432L; + + private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); + private final int numSlots; + + public SlotBasedCounter(int numSlots) { + if (numSlots <= 0) { + throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); + } + this.numSlots = numSlots; + } + + public void incrementCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot]++; + } + + public long getCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + return 0; + } + else { + return counts[slot]; + } + } + + public Map<T, Long> getCounts() { + Map<T, Long> result = new HashMap<T, Long>(); + for (T obj : objToCounts.keySet()) { + result.put(obj, computeTotalCount(obj)); + } + return result; + } + + private long computeTotalCount(T obj) { + long[] curr = objToCounts.get(obj); + long total = 0; + for (long l : curr) { + total += l; + } + return total; + } + + /** + * Reset the slot count of any tracked objects to zero for the given slot. + * + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj, slot); + } + } + + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + counts[slot] = 0; + } + + private boolean shouldBeRemovedFromCounter(T obj) { + return computeTotalCount(obj) == 0; + } + + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + Set<T> objToBeRemoved = new HashSet<T>(); + for (T obj : objToCounts.keySet()) { + if (shouldBeRemovedFromCounter(obj)) { + objToBeRemoved.add(obj); + } + } + for (T obj : objToBeRemoved) { + objToCounts.remove(obj); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java new file mode 100644 index 0000000..dc4cb4b --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Contains some contributions under the Thrift Software License. + * Please see doc/old-thrift-license.txt in the Thrift distribution for + * details. + */ +package org.apache.storm.starter.trident; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.LocalDRPC; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.storm.kafka.StringScheme; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.kafka.bolt.KafkaBolt; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout; +import org.apache.storm.kafka.trident.TridentKafkaConfig; +import org.apache.storm.starter.spout.RandomSentenceSpout; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Count; +import org.apache.storm.trident.operation.builtin.FilterNull; +import org.apache.storm.trident.operation.builtin.MapGet; +import org.apache.storm.trident.testing.MemoryMapState; +import org.apache.storm.trident.testing.Split; + +import java.util.Properties; + +/** + * A sample word count trident topology using transactional kafka spout that has the following components. + * <ol> + * <li> {@link KafkaBolt} + * that receives random sentences from {@link RandomSentenceSpout} and + * publishes the sentences to a kafka "test" topic. + * </li> + * <li> {@link TransactionalTridentKafkaSpout} + * that consumes sentences from the "test" topic, splits it into words, aggregates + * and stores the word count in a {@link MemoryMapState}. + * </li> + * <li> DRPC query + * that returns the word counts by querying the trident state (MemoryMapState). + * </li> + * </ol> + * <p> + * For more background read the <a href="https://storm.apache.org/documentation/Trident-tutorial.html">trident tutorial</a>, + * <a href="https://storm.apache.org/documentation/Trident-state">trident state</a> and + * <a href="https://github.com/apache/storm/tree/master/external/storm-kafka"> Storm Kafka </a>. + * </p> + */ +public class TridentKafkaWordCount { + + private String zkUrl; + private String brokerUrl; + + TridentKafkaWordCount(String zkUrl, String brokerUrl) { + this.zkUrl = zkUrl; + this.brokerUrl = brokerUrl; + } + + /** + * Creates a transactional kafka spout that consumes any new data published to "test" topic. + * <p/> + * For more info on transactional spouts + * see "Transactional spouts" section in + * <a href="https://storm.apache.org/documentation/Trident-state"> Trident state</a> doc. + * + * @return a transactional trident kafka spout. + */ + private TransactionalTridentKafkaSpout createKafkaSpout() { + ZkHosts hosts = new ZkHosts(zkUrl); + TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test"); + config.scheme = new SchemeAsMultiScheme(new StringScheme()); + + // Consume new data from the topic + config.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); + return new TransactionalTridentKafkaSpout(config); + } + + + private Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) { + return tridentTopology.newDRPCStream("words", drpc) + .each(new Fields("args"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count")) + .each(new Fields("count"), new FilterNull()) + .project(new Fields("word", "count")); + } + + private TridentState addTridentState(TridentTopology tridentTopology) { + return tridentTopology.newStream("spout1", createKafkaSpout()).parallelismHint(1) + .each(new Fields("str"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) + .parallelismHint(1); + } + + /** + * Creates a trident topology that consumes sentences from the kafka "test" topic using a + * {@link TransactionalTridentKafkaSpout} computes the word count and stores it in a {@link MemoryMapState}. + * A DRPC stream is then created to query the word counts. + * @param drpc + * @return + */ + public StormTopology buildConsumerTopology(LocalDRPC drpc) { + TridentTopology tridentTopology = new TridentTopology(); + addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc); + return tridentTopology.build(); + } + + /** + * Return the consumer topology config. + * + * @return the topology config + */ + public Config getConsumerConfig() { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + // conf.setDebug(true); + return conf; + } + + /** + * A topology that produces random sentences using {@link RandomSentenceSpout} and + * publishes the sentences using a KafkaBolt to kafka "test" topic. + * + * @return the storm topology + */ + public StormTopology buildProducerTopology(Properties prop) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new RandomSentenceSpout(), 2); + /** + * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField + * so that this gets written out as the message in the kafka topic. + */ + KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop) + .withTopicSelector(new DefaultTopicSelector("test")) + .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word")); + builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout"); + return builder.createTopology(); + } + + /** + * Returns the storm config for the topology that publishes sentences to kafka "test" topic using a kafka bolt. + * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt. + * + * @return the topology config + */ + public Properties getProducerConfig() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer"); + return props; + } + + /** + * <p> + * To run this topology ensure you have a kafka broker running. + * </p> + * Create a topic test with command line, + * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test + */ + public static void main(String[] args) throws Exception { + + String zkUrl = "localhost:2181"; // the defaults. + String brokerUrl = "localhost:9092"; + + if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) { + System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]"); + System.out.println(" E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]"); + System.exit(1); + } else if (args.length == 1) { + zkUrl = args[0]; + } else if (args.length == 2) { + zkUrl = args[0]; + brokerUrl = args[1]; + } + + System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl); + + TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, brokerUrl); + + LocalDRPC drpc = new LocalDRPC(); + LocalCluster cluster = new LocalCluster(); + + // submit the consumer topology. + cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc)); + + Config conf = new Config(); + conf.setMaxSpoutPending(20); + // submit the producer topology. + cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); + + // keep querying the word counts for a minute. + for (int i = 0; i < 60; i++) { + System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); + Thread.sleep(1000); + } + + cluster.killTopology("kafkaBolt"); + cluster.killTopology("wordCounter"); + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java new file mode 100644 index 0000000..056b2b6 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.LocalDRPC; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.CombinerAggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.builtin.MapGet; +import org.apache.storm.trident.operation.builtin.Sum; +import org.apache.storm.trident.state.ReadOnlyState; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.map.ReadOnlyMapState; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.*; + +public class TridentReach { + public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ + put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); + put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); + put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); + }}; + + public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ + put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); + put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); + put("tim", Arrays.asList("alex")); + put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); + put("adam", Arrays.asList("david", "carissa")); + put("mike", Arrays.asList("john", "bob")); + put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); + }}; + + public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { + public static class Factory implements StateFactory { + Map _map; + + public Factory(Map map) { + _map = map; + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + return new StaticSingleKeyMapState(_map); + } + + } + + Map _map; + + public StaticSingleKeyMapState(Map map) { + _map = map; + } + + + @Override + public List<Object> multiGet(List<List<Object>> keys) { + List<Object> ret = new ArrayList(); + for (List<Object> key : keys) { + Object singleKey = key.get(0); + ret.add(_map.get(singleKey)); + } + return ret; + } + + } + + public static class One implements CombinerAggregator<Integer> { + @Override + public Integer init(TridentTuple tuple) { + return 1; + } + + @Override + public Integer combine(Integer val1, Integer val2) { + return 1; + } + + @Override + public Integer zero() { + return 1; + } + } + + public static class ExpandList extends BaseFunction { + + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + List l = (List) tuple.getValue(0); + if (l != null) { + for (Object o : l) { + collector.emit(new Values(o)); + } + } + } + + } + + public static StormTopology buildTopology(LocalDRPC drpc) { + TridentTopology topology = new TridentTopology(); + TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB)); + TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)); + + + topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields( + "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery( + tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"), + new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields( + "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach")); + return topology.build(); + } + + public static void main(String[] args) throws Exception { + LocalDRPC drpc = new LocalDRPC(); + + Config conf = new Config(); + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("reach", conf, buildTopology(drpc)); + + Thread.sleep(2000); + + System.out.println("REACH: " + drpc.execute("reach", "aaa")); + System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1")); + System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5")); + + + cluster.shutdown(); + drpc.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java new file mode 100644 index 0000000..93ccf18 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.LocalDRPC; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.builtin.Count; +import org.apache.storm.trident.operation.builtin.FilterNull; +import org.apache.storm.trident.operation.builtin.MapGet; +import org.apache.storm.trident.operation.builtin.Sum; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.testing.MemoryMapState; +import org.apache.storm.trident.tuple.TridentTuple; + + +public class TridentWordCount { + public static class Split extends BaseFunction { + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + String sentence = tuple.getString(0); + for (String word : sentence.split(" ")) { + collector.emit(new Values(word)); + } + } + } + + public static StormTopology buildTopology(LocalDRPC drpc) { + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), + new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), + new Count(), new Fields("count")).parallelismHint(16); + + topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields( + "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), + new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum")); + return topology.build(); + } + + public static void main(String[] args) throws Exception { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + if (args.length == 0) { + LocalDRPC drpc = new LocalDRPC(); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); + for (int i = 0; i < 100; i++) { + System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); + Thread.sleep(1000); + } + } + else { + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java new file mode 100644 index 0000000..d7f2bf4 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.util; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; + +public final class StormRunner { + + private static final int MILLIS_IN_SEC = 1000; + + private StormRunner() { + } + + public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) + throws InterruptedException { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology(topologyName, conf, topology); + Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); + cluster.killTopology(topologyName); + cluster.shutdown(); + } + + public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf) + throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + StormSubmitter.submitTopology(topologyName, conf, topology); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java deleted file mode 100644 index 3ea83a1..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.StormSubmitter; -import backtype.storm.drpc.LinearDRPCTopologyBuilder; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -/** - * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a - * "!" to any string you send the DRPC function. - * - * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a> - */ -public class BasicDRPCTopology { - public static class ExclaimBolt extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String input = tuple.getString(1); - collector.emit(new Values(tuple.getValue(0), input + "!")); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "result")); - } - - } - - public static void main(String[] args) throws Exception { - LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); - builder.addBolt(new ExclaimBolt(), 3); - - Config conf = new Config(); - - if (args == null || args.length == 0) { - LocalDRPC drpc = new LocalDRPC(); - LocalCluster cluster = new LocalCluster(); - - cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); - - for (String word : new String[]{ "hello", "goodbye" }) { - System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); - } - - Thread.sleep(10000); - drpc.shutdown(); - cluster.shutdown(); - } - else { - conf.setNumWorkers(3); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology()); - } - } -}
