http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java deleted file mode 100644 index 4c6680e..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java +++ /dev/null @@ -1,432 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.metric.HttpForwardingMetricsServer; -import backtype.storm.metric.HttpForwardingMetricsConsumer; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IMetricsConsumer.TaskInfo; -import backtype.storm.metric.api.IMetricsConsumer.DataPoint; -import backtype.storm.generated.*; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; -import backtype.storm.StormSubmitter; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.storm.metrics.hdrhistogram.HistogramMetric; -import org.HdrHistogram.Histogram; - -/** - * WordCount but the spout goes at a predefined rate and we collect - * proper latency statistics. - */ -public class ThroughputVsLatency { - private static class SentWithTime { - public final String sentence; - public final long time; - - SentWithTime(String sentence, long time) { - this.sentence = sentence; - this.time = time; - } - } - - public static class C { - LocalCluster _local = null; - Nimbus.Client _client = null; - - public C(Map conf) { - Map clusterConf = Utils.readStormConfig(); - if (conf != null) { - clusterConf.putAll(conf); - } - Boolean isLocal = (Boolean)clusterConf.get("run.local"); - if (isLocal != null && isLocal) { - _local = new LocalCluster(); - } else { - _client = NimbusClient.getConfiguredClient(clusterConf).getClient(); - } - } - - public ClusterSummary getClusterInfo() throws Exception { - if (_local != null) { - return _local.getClusterInfo(); - } else { - return _client.getClusterInfo(); - } - } - - public TopologyInfo getTopologyInfo(String id) throws Exception { - if (_local != null) { - return _local.getTopologyInfo(id); - } else { - return _client.getTopologyInfo(id); - } - } - - public void killTopologyWithOpts(String name, KillOptions opts) throws Exception { - if (_local != null) { - _local.killTopologyWithOpts(name, opts); - } else { - _client.killTopologyWithOpts(name, opts); - } - } - - public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception { - if (_local != null) { - _local.submitTopology(name, stormConf, topology); - } else { - StormSubmitter.submitTopology(name, stormConf, topology); - } - } - - public boolean isLocal() { - return _local != null; - } - } - - public static class FastRandomSentenceSpout extends BaseRichSpout { - static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", - "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; - - SpoutOutputCollector _collector; - long _periodNano; - long _emitAmount; - Random _rand; - long _nextEmitTime; - long _emitsLeft; - HistogramMetric _histo; - - public FastRandomSentenceSpout(long ratePerSecond) { - if (ratePerSecond > 0) { - _periodNano = Math.max(1, 1000000000/ratePerSecond); - _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano)); - } else { - _periodNano = Long.MAX_VALUE - 1; - _emitAmount = 1; - } - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = ThreadLocalRandom.current(); - _nextEmitTime = System.nanoTime(); - _emitsLeft = _emitAmount; - _histo = new HistogramMetric(3600000000000L, 3); - context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind - } - - @Override - public void nextTuple() { - if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) { - _emitsLeft = _emitAmount; - _nextEmitTime = _nextEmitTime + _periodNano; - } - - if (_emitsLeft > 0) { - String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)]; - _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano)); - _emitsLeft--; - } - } - - @Override - public void ack(Object id) { - long end = System.nanoTime(); - SentWithTime st = (SentWithTime)id; - _histo.recordValue(end-st.time); - } - - @Override - public void fail(Object id) { - SentWithTime st = (SentWithTime)id; - _collector.emit(new Values(st.sentence), id); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - } - - public static class SplitSentence extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String sentence = tuple.getString(0); - for (String word: sentence.split("\\s+")) { - collector.emit(new Values(word, 1)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - private static class MemMeasure { - private long _mem = 0; - private long _time = 0; - - public synchronized void update(long mem) { - _mem = mem; - _time = System.currentTimeMillis(); - } - - public synchronized long get() { - return isExpired() ? 0l : _mem; - } - - public synchronized boolean isExpired() { - return (System.currentTimeMillis() - _time) >= 20000; - } - } - - private static final Histogram _histo = new Histogram(3600000000000L, 3); - private static final AtomicLong _systemCPU = new AtomicLong(0); - private static final AtomicLong _userCPU = new AtomicLong(0); - private static final AtomicLong _gcCount = new AtomicLong(0); - private static final AtomicLong _gcMs = new AtomicLong(0); - private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>(); - - private static long readMemory() { - long total = 0; - for (MemMeasure mem: _memoryBytes.values()) { - total += mem.get(); - } - return total; - } - - private static long _prev_acked = 0; - private static long _prev_uptime = 0; - - public static void printMetrics(C client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts: summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named "+name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - for (ExecutorSummary exec: info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map<String, Long> failedMap = stats.get_failed().get(":all-time"); - Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); - if (ackedMap != null) { - for (String key: ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; - } - } - long ackVal = ackedMap.get(key); - acked += ackVal; - } - } - } - } - long ackedThisTime = acked - _prev_acked; - long thisTime = uptime - _prev_uptime; - long nnpct, nnnpct, min, max; - double mean, stddev; - synchronized(_histo) { - nnpct = _histo.getValueAtPercentile(99.0); - nnnpct = _histo.getValueAtPercentile(99.9); - min = _histo.getMinValue(); - max = _histo.getMaxValue(); - mean = _histo.getMean(); - stddev = _histo.getStdDeviation(); - _histo.reset(); - } - long user = _userCPU.getAndSet(0); - long sys = _systemCPU.getAndSet(0); - long gc = _gcMs.getAndSet(0); - double memMB = readMemory() / (1024.0 * 1024.0); - System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " + - "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " + - "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n", - uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct, - min, max, mean, stddev, user, sys, gc, memMB); - _prev_uptime = uptime; - _prev_acked = acked; - } - - public static void kill(C client, String name) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(name, opts); - } - - public static void main(String[] args) throws Exception { - long ratePerSecond = 500; - if (args != null && args.length > 0) { - ratePerSecond = Long.valueOf(args[0]); - } - - int parallelism = 4; - if (args != null && args.length > 1) { - parallelism = Integer.valueOf(args[1]); - } - - int numMins = 5; - if (args != null && args.length > 2) { - numMins = Integer.valueOf(args[2]); - } - - String name = "wc-test"; - if (args != null && args.length > 3) { - name = args[3]; - } - - Config conf = new Config(); - HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) { - @Override - public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { - String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort; - for (DataPoint dp: dataPoints) { - if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) { - synchronized(_histo) { - _histo.add((Histogram)dp.value); - } - } else if ("CPU".equals(dp.name) && dp.value instanceof Map) { - Map<Object, Object> m = (Map<Object, Object>)dp.value; - Object sys = m.get("sys-ms"); - if (sys instanceof Number) { - _systemCPU.getAndAdd(((Number)sys).longValue()); - } - Object user = m.get("user-ms"); - if (user instanceof Number) { - _userCPU.getAndAdd(((Number)user).longValue()); - } - } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) { - Map<Object, Object> m = (Map<Object, Object>)dp.value; - Object count = m.get("count"); - if (count instanceof Number) { - _gcCount.getAndAdd(((Number)count).longValue()); - } - Object time = m.get("timeMs"); - if (time instanceof Number) { - _gcMs.getAndAdd(((Number)time).longValue()); - } - } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) { - Map<Object, Object> m = (Map<Object, Object>)dp.value; - Object val = m.get("usedBytes"); - if (val instanceof Number) { - MemMeasure mm = _memoryBytes.get(worker); - if (mm == null) { - mm = new MemMeasure(); - MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm); - mm = tmp == null ? mm : tmp; - } - mm.update(((Number)val).longValue()); - } - } - } - } - }; - - metricServer.serve(); - String url = metricServer.getUrl(); - - C cluster = new C(conf); - conf.setNumWorkers(parallelism); - conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); - conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1); - Map<String, String> workerMetrics = new HashMap<String, String>(); - if (!cluster.isLocal()) { - //sigar uses JNI and does not work in local mode - workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric"); - } - conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics); - conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10); - conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, - "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled"); - conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g"); - - TopologyBuilder builder = new TopologyBuilder(); - - int numEach = 4 * parallelism; - builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach); - - builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word")); - - try { - cluster.submitTopology(name, conf, builder.createTopology()); - - for (int i = 0; i < numMins * 2; i++) { - Thread.sleep(30 * 1000); - printMetrics(cluster, name); - } - } finally { - kill(cluster, name); - } - System.exit(0); - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java deleted file mode 100644 index 706afd1..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.coordination.BatchOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MemoryTransactionalSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBatchBolt; -import backtype.storm.topology.base.BaseTransactionalBolt; -import backtype.storm.transactional.ICommitter; -import backtype.storm.transactional.TransactionAttempt; -import backtype.storm.transactional.TransactionalTopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a - * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. - * - * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a> - */ -public class TransactionalGlobalCount { - public static final int PARTITION_TAKE_PER_BATCH = 3; - public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ - put(0, new ArrayList<List<Object>>() {{ - add(new Values("cat")); - add(new Values("dog")); - add(new Values("chicken")); - add(new Values("cat")); - add(new Values("dog")); - add(new Values("apple")); - }}); - put(1, new ArrayList<List<Object>>() {{ - add(new Values("cat")); - add(new Values("dog")); - add(new Values("apple")); - add(new Values("banana")); - }}); - put(2, new ArrayList<List<Object>>() {{ - add(new Values("cat")); - add(new Values("cat")); - add(new Values("cat")); - add(new Values("cat")); - add(new Values("cat")); - add(new Values("dog")); - add(new Values("dog")); - add(new Values("dog")); - add(new Values("dog")); - }}); - }}; - - public static class Value { - int count = 0; - BigInteger txid; - } - - public static Map<String, Value> DATABASE = new HashMap<String, Value>(); - public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; - - public static class BatchCount extends BaseBatchBolt { - Object _id; - BatchOutputCollector _collector; - - int _count = 0; - - @Override - public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { - _collector = collector; - _id = id; - } - - @Override - public void execute(Tuple tuple) { - _count++; - } - - @Override - public void finishBatch() { - _collector.emit(new Values(_id, _count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "count")); - } - } - - public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter { - TransactionAttempt _attempt; - BatchOutputCollector _collector; - - int _sum = 0; - - @Override - public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { - _collector = collector; - _attempt = attempt; - } - - @Override - public void execute(Tuple tuple) { - _sum += tuple.getInteger(1); - } - - @Override - public void finishBatch() { - Value val = DATABASE.get(GLOBAL_COUNT_KEY); - Value newval; - if (val == null || !val.txid.equals(_attempt.getTransactionId())) { - newval = new Value(); - newval.txid = _attempt.getTransactionId(); - if (val == null) { - newval.count = _sum; - } - else { - newval.count = _sum + val.count; - } - DATABASE.put(GLOBAL_COUNT_KEY, newval); - } - else { - newval = val; - } - _collector.emit(new Values(_attempt, newval.count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "sum")); - } - } - - public static void main(String[] args) throws Exception { - MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); - TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); - builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); - builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); - - LocalCluster cluster = new LocalCluster(); - - Config config = new Config(); - config.setDebug(true); - config.setMaxSpoutPending(3); - - cluster.submitTopology("global-count-topology", config, builder.buildTopology()); - - Thread.sleep(3000); - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java deleted file mode 100644 index 4d5ba1b..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.coordination.BatchOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MemoryTransactionalSpout; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseTransactionalBolt; -import backtype.storm.transactional.ICommitter; -import backtype.storm.transactional.TransactionAttempt; -import backtype.storm.transactional.TransactionalTopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a - * stream of words and produces two outputs: - * <p/> - * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in - * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on. - * <p/> - * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move - * between buckets as their counts accumulate. - */ -public class TransactionalWords { - public static class CountValue { - Integer prev_count = null; - int count = 0; - BigInteger txid = null; - } - - public static class BucketValue { - int count = 0; - BigInteger txid; - } - - public static final int BUCKET_SIZE = 10; - - public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>(); - public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>(); - - - public static final int PARTITION_TAKE_PER_BATCH = 3; - - public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ - put(0, new ArrayList<List<Object>>() {{ - add(new Values("cat")); - add(new Values("dog")); - add(new Values("chicken")); - add(new Values("cat")); - add(new Values("dog")); - add(new Values("apple")); - }}); - put(1, new ArrayList<List<Object>>() {{ - add(new Values("cat")); - add(new Values("dog")); - add(new Values("apple")); - add(new Values("banana")); - }}); - put(2, new ArrayList<List<Object>>() {{ - add(new Values("cat")); - add(new Values("cat")); - add(new Values("cat")); - add(new Values("cat")); - add(new Values("cat")); - add(new Values("dog")); - add(new Values("dog")); - add(new Values("dog")); - add(new Values("dog")); - }}); - }}; - - public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter { - Map<String, Integer> _counts = new HashMap<String, Integer>(); - BatchOutputCollector _collector; - TransactionAttempt _id; - - int _count = 0; - - @Override - public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { - _collector = collector; - _id = id; - } - - @Override - public void execute(Tuple tuple) { - String key = tuple.getString(1); - Integer curr = _counts.get(key); - if (curr == null) - curr = 0; - _counts.put(key, curr + 1); - } - - @Override - public void finishBatch() { - for (String key : _counts.keySet()) { - CountValue val = COUNT_DATABASE.get(key); - CountValue newVal; - if (val == null || !val.txid.equals(_id)) { - newVal = new CountValue(); - newVal.txid = _id.getTransactionId(); - if (val != null) { - newVal.prev_count = val.count; - newVal.count = val.count; - } - newVal.count = newVal.count + _counts.get(key); - COUNT_DATABASE.put(key, newVal); - } - else { - newVal = val; - } - _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "key", "count", "prev-count")); - } - } - - public static class Bucketize extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); - int curr = tuple.getInteger(2); - Integer prev = tuple.getInteger(3); - - int currBucket = curr / BUCKET_SIZE; - Integer prevBucket = null; - if (prev != null) { - prevBucket = prev / BUCKET_SIZE; - } - - if (prevBucket == null) { - collector.emit(new Values(attempt, currBucket, 1)); - } - else if (currBucket != prevBucket) { - collector.emit(new Values(attempt, currBucket, 1)); - collector.emit(new Values(attempt, prevBucket, -1)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("attempt", "bucket", "delta")); - } - } - - public static class BucketCountUpdater extends BaseTransactionalBolt { - Map<Integer, Integer> _accum = new HashMap<Integer, Integer>(); - BatchOutputCollector _collector; - TransactionAttempt _attempt; - - int _count = 0; - - @Override - public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { - _collector = collector; - _attempt = attempt; - } - - @Override - public void execute(Tuple tuple) { - Integer bucket = tuple.getInteger(1); - Integer delta = tuple.getInteger(2); - Integer curr = _accum.get(bucket); - if (curr == null) - curr = 0; - _accum.put(bucket, curr + delta); - } - - @Override - public void finishBatch() { - for (Integer bucket : _accum.keySet()) { - BucketValue currVal = BUCKET_DATABASE.get(bucket); - BucketValue newVal; - if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) { - newVal = new BucketValue(); - newVal.txid = _attempt.getTransactionId(); - newVal.count = _accum.get(bucket); - if (currVal != null) - newVal.count += currVal.count; - BUCKET_DATABASE.put(bucket, newVal); - } - else { - newVal = currVal; - } - _collector.emit(new Values(_attempt, bucket, newVal.count)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "bucket", "count")); - } - } - - public static void main(String[] args) throws Exception { - MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); - TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); - builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word")); - builder.setBolt("bucketize", new Bucketize()).noneGrouping("count"); - builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket")); - - - LocalCluster cluster = new LocalCluster(); - - Config config = new Config(); - config.setDebug(true); - config.setMaxSpoutPending(3); - - cluster.submitTopology("top-n-topology", config, builder.buildTopology()); - - Thread.sleep(3000); - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java deleted file mode 100644 index 7260beb..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.task.ShellBolt; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import storm.starter.spout.RandomSentenceSpout; - -import java.util.HashMap; -import java.util.Map; - -/** - * This topology demonstrates Storm's stream groupings and multilang capabilities. - */ -public class WordCountTopology { - public static class SplitSentence extends ShellBolt implements IRichBolt { - - public SplitSentence() { - super("python", "splitsentence.py"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static void main(String[] args) throws Exception { - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new RandomSentenceSpout(), 5); - - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.setDebug(true); - - if (args != null && args.length > 0) { - conf.setNumWorkers(3); - - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } - else { - conf.setMaxTaskParallelism(3); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("word-count", conf, builder.createTopology()); - - Thread.sleep(10000); - - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java deleted file mode 100644 index 3fe982f..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.spout.ShellSpout; -import backtype.storm.task.ShellBolt; -import backtype.storm.topology.*; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import java.util.HashMap; -import java.util.Map; - -/** - * This topology demonstrates Storm's stream groupings and multilang capabilities. - */ -public class WordCountTopologyNode { - public static class SplitSentence extends ShellBolt implements IRichBolt { - - public SplitSentence() { - super("node", "splitsentence.js"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class RandomSentence extends ShellSpout implements IRichSpout { - - public RandomSentence() { - super("node", "randomsentence.js"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static void main(String[] args) throws Exception { - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new RandomSentence(), 5); - - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.setDebug(true); - - - if (args != null && args.length > 0) { - conf.setNumWorkers(3); - - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } - else { - conf.setMaxTaskParallelism(3); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("word-count", conf, builder.createTopology()); - - Thread.sleep(10000); - - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java deleted file mode 100644 index 64ceb29..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.Config; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.TupleUtils; -import org.apache.log4j.Logger; -import storm.starter.tools.Rankings; - -import java.util.HashMap; -import java.util.Map; - -/** - * This abstract bolt provides the basic behavior of bolts that rank objects according to their count. - * <p/> - * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow - * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those - * tuples are retrieved and counted. - */ -public abstract class AbstractRankerBolt extends BaseBasicBolt { - - private static final long serialVersionUID = 4931640198501530202L; - private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; - private static final int DEFAULT_COUNT = 10; - - private final int emitFrequencyInSeconds; - private final int count; - private final Rankings rankings; - - public AbstractRankerBolt() { - this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); - } - - public AbstractRankerBolt(int topN) { - this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); - } - - public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { - if (topN < 1) { - throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")"); - } - if (emitFrequencyInSeconds < 1) { - throw new IllegalArgumentException( - "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)"); - } - count = topN; - this.emitFrequencyInSeconds = emitFrequencyInSeconds; - rankings = new Rankings(count); - } - - protected Rankings getRankings() { - return rankings; - } - - /** - * This method functions as a template method (design pattern). - */ - @Override - public final void execute(Tuple tuple, BasicOutputCollector collector) { - if (TupleUtils.isTick(tuple)) { - getLogger().debug("Received tick tuple, triggering emit of current rankings"); - emitRankings(collector); - } - else { - updateRankingsWithTuple(tuple); - } - } - - abstract void updateRankingsWithTuple(Tuple tuple); - - private void emitRankings(BasicOutputCollector collector) { - collector.emit(new Values(rankings.copy())); - getLogger().debug("Rankings: " + rankings); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("rankings")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Map<String, Object> conf = new HashMap<String, Object>(); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); - return conf; - } - - abstract Logger getLogger(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java deleted file mode 100644 index d1805ff..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.tuple.Tuple; -import org.apache.log4j.Logger; -import storm.starter.tools.Rankable; -import storm.starter.tools.RankableObjectWithFields; - -/** - * This bolt ranks incoming objects by their count. - * <p/> - * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1, - * additionalField2, ..., additionalFieldN). - */ -public final class IntermediateRankingsBolt extends AbstractRankerBolt { - - private static final long serialVersionUID = -1369800530256637409L; - private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class); - - public IntermediateRankingsBolt() { - super(); - } - - public IntermediateRankingsBolt(int topN) { - super(topN); - } - - public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { - super(topN, emitFrequencyInSeconds); - } - - @Override - void updateRankingsWithTuple(Tuple tuple) { - Rankable rankable = RankableObjectWithFields.from(tuple); - super.getRankings().updateWith(rankable); - } - - @Override - Logger getLogger() { - return LOG; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java deleted file mode 100644 index 58fc8ca..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Tuple; - - -public class PrinterBolt extends BaseBasicBolt { - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - System.out.println(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer ofd) { - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java deleted file mode 100644 index e222a97..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import org.apache.log4j.Logger; -import storm.starter.tools.NthLastModifiedTimeTracker; -import storm.starter.tools.SlidingWindowCounter; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** - * This bolt aggregates counts from multiple upstream bolts. - */ -public class RollingCountAggBolt extends BaseRichBolt { - private static final long serialVersionUID = 5537727428628598519L; - private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class); - //Mapping of key->upstreamBolt->count - private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>(); - private OutputCollector collector; - - - @SuppressWarnings("rawtypes") - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple tuple) { - Object obj = tuple.getValue(0); - long count = tuple.getLong(1); - int source = tuple.getSourceTask(); - Map<Integer, Long> subCounts = counts.get(obj); - if (subCounts == null) { - subCounts = new HashMap<Integer, Long>(); - counts.put(obj, subCounts); - } - //Update the current count for this object - subCounts.put(source, count); - //Output the sum of all the known counts so for this key - long sum = 0; - for (Long val: subCounts.values()) { - sum += val; - } - collector.emit(new Values(obj, sum)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("obj", "count")); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java deleted file mode 100644 index 31f7ee2..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.TupleUtils; -import org.apache.log4j.Logger; -import storm.starter.tools.NthLastModifiedTimeTracker; -import storm.starter.tools.SlidingWindowCounter; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** - * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting. - * <p/> - * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output - * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the - * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five - * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every - * minute. - * <p/> - * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the - * actual duration of the sliding window. The latter is included in case the expected sliding window length (as - * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual - * window length is tracked and calculated for the window, and not individually for each object within a window. - * <p/> - * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window - * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window - * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning - * during the first ~ five minutes of startup time if the window length is set to five minutes). - */ -public class RollingCountBolt extends BaseRichBolt { - - private static final long serialVersionUID = 5537727428628598519L; - private static final Logger LOG = Logger.getLogger(RollingCountBolt.class); - private static final int NUM_WINDOW_CHUNKS = 5; - private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; - private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; - private static final String WINDOW_LENGTH_WARNING_TEMPLATE = - "Actual window length is %d seconds when it should be %d seconds" - + " (you can safely ignore this warning during the startup phase)"; - - private final SlidingWindowCounter<Object> counter; - private final int windowLengthInSeconds; - private final int emitFrequencyInSeconds; - private OutputCollector collector; - private NthLastModifiedTimeTracker lastModifiedTracker; - - public RollingCountBolt() { - this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); - } - - public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { - this.windowLengthInSeconds = windowLengthInSeconds; - this.emitFrequencyInSeconds = emitFrequencyInSeconds; - counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, - this.emitFrequencyInSeconds)); - } - - private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { - return windowLengthInSeconds / windowUpdateFrequencyInSeconds; - } - - @SuppressWarnings("rawtypes") - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, - this.emitFrequencyInSeconds)); - } - - @Override - public void execute(Tuple tuple) { - if (TupleUtils.isTick(tuple)) { - LOG.debug("Received tick tuple, triggering emit of current window counts"); - emitCurrentWindowCounts(); - } - else { - countObjAndAck(tuple); - } - } - - private void emitCurrentWindowCounts() { - Map<Object, Long> counts = counter.getCountsThenAdvanceWindow(); - int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); - lastModifiedTracker.markAsModified(); - if (actualWindowLengthInSeconds != windowLengthInSeconds) { - LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); - } - emit(counts, actualWindowLengthInSeconds); - } - - private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) { - for (Entry<Object, Long> entry : counts.entrySet()) { - Object obj = entry.getKey(); - Long count = entry.getValue(); - collector.emit(new Values(obj, count, actualWindowLengthInSeconds)); - } - } - - private void countObjAndAck(Tuple tuple) { - Object obj = tuple.getValue(0); - counter.incrementCount(obj); - collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Map<String, Object> conf = new HashMap<String, Object>(); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java deleted file mode 100644 index 85a7a26..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.Config; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.TimeCacheMap; - -import java.util.*; - -public class SingleJoinBolt extends BaseRichBolt { - OutputCollector _collector; - Fields _idFields; - Fields _outFields; - int _numSources; - TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending; - Map<String, GlobalStreamId> _fieldLocations; - - public SingleJoinBolt(Fields outFields) { - _outFields = outFields; - } - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - _fieldLocations = new HashMap<String, GlobalStreamId>(); - _collector = collector; - int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); - _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); - _numSources = context.getThisSources().size(); - Set<String> idFields = null; - for (GlobalStreamId source : context.getThisSources().keySet()) { - Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); - Set<String> setFields = new HashSet<String>(fields.toList()); - if (idFields == null) - idFields = setFields; - else - idFields.retainAll(setFields); - - for (String outfield : _outFields) { - for (String sourcefield : fields) { - if (outfield.equals(sourcefield)) { - _fieldLocations.put(outfield, source); - } - } - } - } - _idFields = new Fields(new ArrayList<String>(idFields)); - - if (_fieldLocations.size() != _outFields.size()) { - throw new RuntimeException("Cannot find all outfields among sources"); - } - } - - @Override - public void execute(Tuple tuple) { - List<Object> id = tuple.select(_idFields); - GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); - if (!_pending.containsKey(id)) { - _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); - } - Map<GlobalStreamId, Tuple> parts = _pending.get(id); - if (parts.containsKey(streamId)) - throw new RuntimeException("Received same side of single join twice"); - parts.put(streamId, tuple); - if (parts.size() == _numSources) { - _pending.remove(id); - List<Object> joinResult = new ArrayList<Object>(); - for (String outField : _outFields) { - GlobalStreamId loc = _fieldLocations.get(outField); - joinResult.add(parts.get(loc).getValueByField(outField)); - } - _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); - - for (Tuple part : parts.values()) { - _collector.ack(part); - } - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(_outFields); - } - - private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { - @Override - public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { - for (Tuple tuple : tuples.values()) { - _collector.fail(tuple); - } - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java deleted file mode 100644 index ef3a0b8..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseWindowedBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.windowing.TupleWindow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -/** - * Computes sliding window sum - */ -public class SlidingWindowSumBolt extends BaseWindowedBolt { - private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class); - - private int sum = 0; - private OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(TupleWindow inputWindow) { - /* - * The inputWindow gives a view of - * (a) all the events in the window - * (b) events that expired since last activation of the window - * (c) events that newly arrived since last activation of the window - */ - List<Tuple> tuplesInWindow = inputWindow.get(); - List<Tuple> newTuples = inputWindow.getNew(); - List<Tuple> expiredTuples = inputWindow.getExpired(); - - LOG.debug("Events in current window: " + tuplesInWindow.size()); - /* - * Instead of iterating over all the tuples in the window to compute - * the sum, the values for the new events are added and old events are - * subtracted. Similar optimizations might be possible in other - * windowing computations. - */ - for (Tuple tuple : newTuples) { - sum += (int) tuple.getValue(0); - } - for (Tuple tuple : expiredTuples) { - sum -= (int) tuple.getValue(0); - } - collector.emit(new Values(sum)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sum")); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java deleted file mode 100644 index 0e1bb05..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.bolt; - -import backtype.storm.tuple.Tuple; -import org.apache.log4j.Logger; -import storm.starter.tools.Rankings; - -/** - * This bolt merges incoming {@link Rankings}. - * <p/> - * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final, - * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}. - */ -public final class TotalRankingsBolt extends AbstractRankerBolt { - - private static final long serialVersionUID = -8447525895532302198L; - private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class); - - public TotalRankingsBolt() { - super(); - } - - public TotalRankingsBolt(int topN) { - super(topN); - } - - public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) { - super(topN, emitFrequencyInSeconds); - } - - @Override - void updateRankingsWithTuple(Tuple tuple) { - Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); - super.getRankings().updateWith(rankingsToBeMerged); - super.getRankings().pruneZeroCounts(); - } - - @Override - Logger getLogger() { - return LOG; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java deleted file mode 100644 index 5778c8e..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.spout; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import java.util.Map; -import java.util.Random; - -/** - * Emits a random integer and a timestamp value (offset by one day), - * every 100 ms. The ts field can be used in tuple time based windowing. - */ -public class RandomIntegerSpout extends BaseRichSpout { - private SpoutOutputCollector collector; - private Random rand; - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("value", "ts")); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - this.rand = new Random(); - } - - @Override - public void nextTuple() { - Utils.sleep(100); - collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000))); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java deleted file mode 100644 index 813b10c..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.spout; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import java.util.Map; -import java.util.Random; - -public class RandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - Random _rand; - - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = new Random(); - } - - @Override - public void nextTuple() { - Utils.sleep(100); - String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", - "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; - String sentence = sentences[_rand.nextInt(sentences.length)]; - _collector.emit(new Values(sentence)); - } - - @Override - public void ack(Object id) { - } - - @Override - public void fail(Object id) { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java deleted file mode 100644 index 40f8d72..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package storm.starter.spout; - -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - -import twitter4j.FilterQuery; -import twitter4j.StallWarning; -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.StatusListener; -import twitter4j.TwitterStream; -import twitter4j.TwitterStreamFactory; -import twitter4j.auth.AccessToken; -import twitter4j.conf.ConfigurationBuilder; - -import backtype.storm.Config; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -@SuppressWarnings("serial") -public class TwitterSampleSpout extends BaseRichSpout { - - SpoutOutputCollector _collector; - LinkedBlockingQueue<Status> queue = null; - TwitterStream _twitterStream; - String consumerKey; - String consumerSecret; - String accessToken; - String accessTokenSecret; - String[] keyWords; - - public TwitterSampleSpout(String consumerKey, String consumerSecret, - String accessToken, String accessTokenSecret, String[] keyWords) { - this.consumerKey = consumerKey; - this.consumerSecret = consumerSecret; - this.accessToken = accessToken; - this.accessTokenSecret = accessTokenSecret; - this.keyWords = keyWords; - } - - public TwitterSampleSpout() { - // TODO Auto-generated constructor stub - } - - @Override - public void open(Map conf, TopologyContext context, - SpoutOutputCollector collector) { - queue = new LinkedBlockingQueue<Status>(1000); - _collector = collector; - - StatusListener listener = new StatusListener() { - - @Override - public void onStatus(Status status) { - - queue.offer(status); - } - - @Override - public void onDeletionNotice(StatusDeletionNotice sdn) { - } - - @Override - public void onTrackLimitationNotice(int i) { - } - - @Override - public void onScrubGeo(long l, long l1) { - } - - @Override - public void onException(Exception ex) { - } - - @Override - public void onStallWarning(StallWarning arg0) { - // TODO Auto-generated method stub - - } - - }; - - TwitterStream twitterStream = new TwitterStreamFactory( - new ConfigurationBuilder().setJSONStoreEnabled(true).build()) - .getInstance(); - - twitterStream.addListener(listener); - twitterStream.setOAuthConsumer(consumerKey, consumerSecret); - AccessToken token = new AccessToken(accessToken, accessTokenSecret); - twitterStream.setOAuthAccessToken(token); - - if (keyWords.length == 0) { - - twitterStream.sample(); - } - - else { - - FilterQuery query = new FilterQuery().track(keyWords); - twitterStream.filter(query); - } - - } - - @Override - public void nextTuple() { - Status ret = queue.poll(); - if (ret == null) { - Utils.sleep(50); - } else { - _collector.emit(new Values(ret)); - - } - } - - @Override - public void close() { - _twitterStream.shutdown(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Config ret = new Config(); - ret.setMaxTaskParallelism(1); - return ret; - } - - @Override - public void ack(Object id) { - } - - @Override - public void fail(Object id) { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("tweet")); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java b/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java deleted file mode 100644 index 08df8cf..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -import backtype.storm.utils.Time; -import org.apache.commons.collections.buffer.CircularFifoBuffer; - -/** - * This class tracks the time-since-last-modify of a "thing" in a rolling fashion. - * <p/> - * For example, create a 5-slot tracker to track the five most recent time-since-last-modify. - * <p/> - * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just - * been modified. - */ -public class NthLastModifiedTimeTracker { - - private static final int MILLIS_IN_SEC = 1000; - - private final CircularFifoBuffer lastModifiedTimesMillis; - - public NthLastModifiedTimeTracker(int numTimesToTrack) { - if (numTimesToTrack < 1) { - throw new IllegalArgumentException( - "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); - } - lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); - initLastModifiedTimesMillis(); - } - - private void initLastModifiedTimesMillis() { - long nowCached = now(); - for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { - lastModifiedTimesMillis.add(Long.valueOf(nowCached)); - } - } - - private long now() { - return Time.currentTimeMillis(); - } - - public int secondsSinceOldestModification() { - long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue(); - return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); - } - - public void markAsModified() { - updateLastModifiedTime(); - } - - private void updateLastModifiedTime() { - lastModifiedTimesMillis.add(now()); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java b/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java deleted file mode 100644 index 85e3d1d..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -public interface Rankable extends Comparable<Rankable> { - - Object getObject(); - - long getCount(); - - /** - * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is. - * - * @return a defensive copy - */ - Rankable copy(); -}
