http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java new file mode 100644 index 0000000..13b2121 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.LocalDRPC; +import org.apache.storm.StormSubmitter; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.drpc.LinearDRPCTopologyBuilder; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseBatchBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.*; + +/** + * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can + * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation. + * <p/> + * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people + * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the + * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower + * records. + * <p/> + * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes + * minutes on a single machine into one that takes just a couple seconds. + * <p/> + * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps. + * + * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a> + */ +public class ReachTopology { + public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ + put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); + put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); + put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); + }}; + + public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ + put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); + put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); + put("tim", Arrays.asList("alex")); + put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); + put("adam", Arrays.asList("david", "carissa")); + put("mike", Arrays.asList("john", "bob")); + put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); + }}; + + public static class GetTweeters extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + Object id = tuple.getValue(0); + String url = tuple.getString(1); + List<String> tweeters = TWEETERS_DB.get(url); + if (tweeters != null) { + for (String tweeter : tweeters) { + collector.emit(new Values(id, tweeter)); + } + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "tweeter")); + } + } + + public static class GetFollowers extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + Object id = tuple.getValue(0); + String tweeter = tuple.getString(1); + List<String> followers = FOLLOWERS_DB.get(tweeter); + if (followers != null) { + for (String follower : followers) { + collector.emit(new Values(id, follower)); + } + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "follower")); + } + } + + public static class PartialUniquer extends BaseBatchBolt { + BatchOutputCollector _collector; + Object _id; + Set<String> _followers = new HashSet<String>(); + + @Override + public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { + _collector = collector; + _id = id; + } + + @Override + public void execute(Tuple tuple) { + _followers.add(tuple.getString(1)); + } + + @Override + public void finishBatch() { + _collector.emit(new Values(_id, _followers.size())); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "partial-count")); + } + } + + public static class CountAggregator extends BaseBatchBolt { + BatchOutputCollector _collector; + Object _id; + int _count = 0; + + @Override + public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { + _collector = collector; + _id = id; + } + + @Override + public void execute(Tuple tuple) { + _count += tuple.getInteger(1); + } + + @Override + public void finishBatch() { + _collector.emit(new Values(_id, _count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "reach")); + } + } + + public static LinearDRPCTopologyBuilder construct() { + LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); + builder.addBolt(new GetTweeters(), 4); + builder.addBolt(new GetFollowers(), 12).shuffleGrouping(); + builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower")); + builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id")); + return builder; + } + + public static void main(String[] args) throws Exception { + LinearDRPCTopologyBuilder builder = construct(); + + + Config conf = new Config(); + + if (args == null || args.length == 0) { + conf.setMaxTaskParallelism(3); + LocalDRPC drpc = new LocalDRPC(); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc)); + + String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" }; + for (String url : urlsToTry) { + System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); + } + + cluster.shutdown(); + drpc.shutdown(); + } + else { + conf.setNumWorkers(6); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology()); + } + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java new file mode 100644 index 0000000..d4aa304 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.SpoutDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.Map; + +public class ResourceAwareExampleTopology { + public static class ExclamationBolt extends BaseRichBolt { + OutputCollector _collector; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + _collector = collector; + } + + @Override + public void execute(Tuple tuple) { + _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); + _collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10); + //set cpu requirement + spout.setCPULoad(20); + //set onheap and offheap memory requirement + spout.setMemoryLoad(64, 16); + + BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); + //sets cpu requirement. Not neccessary to set both CPU and memory. + //For requirements not set, a default value will be used + bolt1.setCPULoad(15); + + BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); + bolt2.setMemoryLoad(100); + + Config conf = new Config(); + conf.setDebug(true); + + /** + * Use to limit the maximum amount of memory (in MB) allocated to one worker process. + * Can be used to spread executors to to multiple workers + */ + conf.setTopologyWorkerMaxHeapSize(1024.0); + + //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases). + //Recommended range of 0-29 but no hard limit set. + conf.setTopologyPriority(29); + + // Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy + conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class); + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } + else { + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(10000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java new file mode 100644 index 0000000..b5ee161 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.log4j.Logger; +import org.apache.storm.starter.bolt.IntermediateRankingsBolt; +import org.apache.storm.starter.bolt.RollingCountBolt; +import org.apache.storm.starter.bolt.TotalRankingsBolt; +import org.apache.storm.starter.util.StormRunner; + +/** + * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. + * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things + * like trending topics or trending images on Twitter. + */ +public class RollingTopWords { + + private static final Logger LOG = Logger.getLogger(RollingTopWords.class); + private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; + private static final int TOP_N = 5; + + private final TopologyBuilder builder; + private final String topologyName; + private final Config topologyConfig; + private final int runtimeInSeconds; + + public RollingTopWords(String topologyName) throws InterruptedException { + builder = new TopologyBuilder(); + this.topologyName = topologyName; + topologyConfig = createTopologyConfiguration(); + runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; + + wireTopology(); + } + + private static Config createTopologyConfiguration() { + Config conf = new Config(); + conf.setDebug(true); + return conf; + } + + private void wireTopology() throws InterruptedException { + String spoutId = "wordGenerator"; + String counterId = "counter"; + String intermediateRankerId = "intermediateRanker"; + String totalRankerId = "finalRanker"; + builder.setSpout(spoutId, new TestWordSpout(), 5); + builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); + builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields( + "obj")); + builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); + } + + public void runLocally() throws InterruptedException { + StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); + } + + public void runRemotely() throws Exception { + StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig); + } + + /** + * Submits (runs) the topology. + * + * Usage: "RollingTopWords [topology-name] [local|remote]" + * + * By default, the topology is run locally under the name "slidingWindowCounts". + * + * Examples: + * + * ``` + * + * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords + * + * # Runs in local mode (LocalCluster), with topology name "foobar" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar + * + * # Runs in local mode (LocalCluster), with topology name "foobar" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar local + * + * # Runs in remote/cluster mode, with topology name "production-topology" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology remote + * ``` + * + * @param args First positional argument (optional) is topology name, second positional argument (optional) defines + * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote"). + * @throws Exception + */ + public static void main(String[] args) throws Exception { + String topologyName = "slidingWindowCounts"; + if (args.length >= 1) { + topologyName = args[0]; + } + boolean runLocally = true; + if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) { + runLocally = false; + } + + LOG.info("Topology name: " + topologyName); + RollingTopWords rtw = new RollingTopWords(topologyName); + if (runLocally) { + LOG.info("Running in local mode"); + rtw.runLocally(); + } + else { + LOG.info("Running in remote (cluster) mode"); + rtw.runRemotely(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java new file mode 100644 index 0000000..b153372 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.apache.storm.starter.bolt.SingleJoinBolt; + +public class SingleJoinExample { + public static void main(String[] args) { + FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); + FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("gender", genderSpout); + builder.setSpout("age", ageSpout); + builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) + .fieldsGrouping("age", new Fields("id")); + + Config conf = new Config(); + conf.setDebug(true); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("join-example", conf, builder.createTopology()); + + for (int i = 0; i < 10; i++) { + String gender; + if (i % 2 == 0) { + gender = "male"; + } + else { + gender = "female"; + } + genderSpout.feed(new Values(i, gender)); + } + + for (int i = 9; i >= 0; i--) { + ageSpout.feed(new Values(i, i + 20)); + } + + Utils.sleep(2000); + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java new file mode 100644 index 0000000..3addc15 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.log4j.Logger; +import org.apache.storm.starter.bolt.IntermediateRankingsBolt; +import org.apache.storm.starter.bolt.RollingCountBolt; +import org.apache.storm.starter.bolt.RollingCountAggBolt; +import org.apache.storm.starter.bolt.TotalRankingsBolt; +import org.apache.storm.starter.util.StormRunner; + +/** + * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. + * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things + * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much + * more common then other words, and uses partialKeyGrouping to better balance the skewed load. + */ +public class SkewedRollingTopWords { + private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class); + private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; + private static final int TOP_N = 5; + + private final TopologyBuilder builder; + private final String topologyName; + private final Config topologyConfig; + private final int runtimeInSeconds; + + public SkewedRollingTopWords(String topologyName) throws InterruptedException { + builder = new TopologyBuilder(); + this.topologyName = topologyName; + topologyConfig = createTopologyConfiguration(); + runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; + + wireTopology(); + } + + private static Config createTopologyConfiguration() { + Config conf = new Config(); + conf.setDebug(true); + return conf; + } + + private void wireTopology() throws InterruptedException { + String spoutId = "wordGenerator"; + String counterId = "counter"; + String aggId = "aggregator"; + String intermediateRankerId = "intermediateRanker"; + String totalRankerId = "finalRanker"; + builder.setSpout(spoutId, new TestWordSpout(), 5); + builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word")); + builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj")); + builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj")); + builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); + } + + public void runLocally() throws InterruptedException { + StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); + } + + public void runRemotely() throws Exception { + StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig); + } + + /** + * Submits (runs) the topology. + * + * Usage: "RollingTopWords [topology-name] [local|remote]" + * + * By default, the topology is run locally under the name "slidingWindowCounts". + * + * Examples: + * + * ``` + * + * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords + * + * # Runs in local mode (LocalCluster), with topology name "foobar" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar + * + * # Runs in local mode (LocalCluster), with topology name "foobar" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords foobar local + * + * # Runs in remote/cluster mode, with topology name "production-topology" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology remote + * ``` + * + * @param args First positional argument (optional) is topology name, second positional argument (optional) defines + * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote"). + * @throws Exception + */ + public static void main(String[] args) throws Exception { + String topologyName = "slidingWindowCounts"; + if (args.length >= 1) { + topologyName = args[0]; + } + boolean runLocally = true; + if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) { + runLocally = false; + } + + LOG.info("Topology name: " + topologyName); + SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName); + if (runLocally) { + LOG.info("Running in local mode"); + rtw.runLocally(); + } + else { + LOG.info("Running in remote (cluster) mode"); + rtw.runRemotely(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java new file mode 100644 index 0000000..90744f2 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.utils.Utils; +import org.apache.storm.starter.bolt.PrinterBolt; +import org.apache.storm.starter.bolt.SlidingWindowSumBolt; +import org.apache.storm.starter.spout.RandomIntegerSpout; + +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * Windowing based on tuple timestamp (e.g. the time when tuple is generated + * rather than when its processed). + */ +public class SlidingTupleTsTopology { + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + BaseWindowedBolt bolt = new SlidingWindowSumBolt() + .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)) + .withTimestampField("ts") + .withLag(new Duration(5, TimeUnit.SECONDS)); + builder.setSpout("integer", new RandomIntegerSpout(), 1); + builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer"); + builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum"); + Config conf = new Config(); + conf.setDebug(true); + + if (args != null && args.length > 0) { + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(40000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java new file mode 100644 index 0000000..cedcec5 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.apache.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.starter.bolt.PrinterBolt; +import org.apache.storm.starter.bolt.SlidingWindowSumBolt; +import org.apache.storm.starter.spout.RandomIntegerSpout; + +import java.util.List; +import java.util.Map; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Count; + +/** + * A sample topology that demonstrates the usage of {@link org.apache.storm.topology.IWindowedBolt} + * to calculate sliding window sum. + */ +public class SlidingWindowTopology { + + private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class); + + /* + * Computes tumbling window average + */ + private static class TumblingWindowAvgBolt extends BaseWindowedBolt { + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + int sum = 0; + List<Tuple> tuplesInWindow = inputWindow.get(); + LOG.debug("Events in current window: " + tuplesInWindow.size()); + if (tuplesInWindow.size() > 0) { + /* + * Since this is a tumbling window calculation, + * we use all the tuples in the window to compute the avg. + */ + for (Tuple tuple : tuplesInWindow) { + sum += (int) tuple.getValue(0); + } + collector.emit(new Values(sum / tuplesInWindow.size())); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("avg")); + } + } + + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("integer", new RandomIntegerSpout(), 1); + builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1) + .shuffleGrouping("integer"); + builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1) + .shuffleGrouping("slidingsum"); + builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg"); + Config conf = new Config(); + conf.setDebug(true); + if (args != null && args.length > 0) { + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } else { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + Utils.sleep(40000); + cluster.killTopology("test"); + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java new file mode 100644 index 0000000..8ee48c9 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java @@ -0,0 +1,432 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.metric.HttpForwardingMetricsServer; +import org.apache.storm.metric.HttpForwardingMetricsConsumer; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo; +import org.apache.storm.metric.api.IMetricsConsumer.DataPoint; +import org.apache.storm.generated.*; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.apache.storm.StormSubmitter; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.storm.metrics.hdrhistogram.HistogramMetric; +import org.HdrHistogram.Histogram; + +/** + * WordCount but the spout goes at a predefined rate and we collect + * proper latency statistics. + */ +public class ThroughputVsLatency { + private static class SentWithTime { + public final String sentence; + public final long time; + + SentWithTime(String sentence, long time) { + this.sentence = sentence; + this.time = time; + } + } + + public static class C { + LocalCluster _local = null; + Nimbus.Client _client = null; + + public C(Map conf) { + Map clusterConf = Utils.readStormConfig(); + if (conf != null) { + clusterConf.putAll(conf); + } + Boolean isLocal = (Boolean)clusterConf.get("run.local"); + if (isLocal != null && isLocal) { + _local = new LocalCluster(); + } else { + _client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + } + } + + public ClusterSummary getClusterInfo() throws Exception { + if (_local != null) { + return _local.getClusterInfo(); + } else { + return _client.getClusterInfo(); + } + } + + public TopologyInfo getTopologyInfo(String id) throws Exception { + if (_local != null) { + return _local.getTopologyInfo(id); + } else { + return _client.getTopologyInfo(id); + } + } + + public void killTopologyWithOpts(String name, KillOptions opts) throws Exception { + if (_local != null) { + _local.killTopologyWithOpts(name, opts); + } else { + _client.killTopologyWithOpts(name, opts); + } + } + + public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception { + if (_local != null) { + _local.submitTopology(name, stormConf, topology); + } else { + StormSubmitter.submitTopology(name, stormConf, topology); + } + } + + public boolean isLocal() { + return _local != null; + } + } + + public static class FastRandomSentenceSpout extends BaseRichSpout { + static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", + "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; + + SpoutOutputCollector _collector; + long _periodNano; + long _emitAmount; + Random _rand; + long _nextEmitTime; + long _emitsLeft; + HistogramMetric _histo; + + public FastRandomSentenceSpout(long ratePerSecond) { + if (ratePerSecond > 0) { + _periodNano = Math.max(1, 1000000000/ratePerSecond); + _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano)); + } else { + _periodNano = Long.MAX_VALUE - 1; + _emitAmount = 1; + } + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = ThreadLocalRandom.current(); + _nextEmitTime = System.nanoTime(); + _emitsLeft = _emitAmount; + _histo = new HistogramMetric(3600000000000L, 3); + context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind + } + + @Override + public void nextTuple() { + if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) { + _emitsLeft = _emitAmount; + _nextEmitTime = _nextEmitTime + _periodNano; + } + + if (_emitsLeft > 0) { + String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)]; + _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano)); + _emitsLeft--; + } + } + + @Override + public void ack(Object id) { + long end = System.nanoTime(); + SentWithTime st = (SentWithTime)id; + _histo.recordValue(end-st.time); + } + + @Override + public void fail(Object id) { + SentWithTime st = (SentWithTime)id; + _collector.emit(new Values(st.sentence), id); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } + } + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + private static class MemMeasure { + private long _mem = 0; + private long _time = 0; + + public synchronized void update(long mem) { + _mem = mem; + _time = System.currentTimeMillis(); + } + + public synchronized long get() { + return isExpired() ? 0l : _mem; + } + + public synchronized boolean isExpired() { + return (System.currentTimeMillis() - _time) >= 20000; + } + } + + private static final Histogram _histo = new Histogram(3600000000000L, 3); + private static final AtomicLong _systemCPU = new AtomicLong(0); + private static final AtomicLong _userCPU = new AtomicLong(0); + private static final AtomicLong _gcCount = new AtomicLong(0); + private static final AtomicLong _gcMs = new AtomicLong(0); + private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>(); + + private static long readMemory() { + long total = 0; + for (MemMeasure mem: _memoryBytes.values()) { + total += mem.get(); + } + return total; + } + + private static long _prev_acked = 0; + private static long _prev_uptime = 0; + + public static void printMetrics(C client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts: summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named "+name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + for (ExecutorSummary exec: info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map<String, Long> failedMap = stats.get_failed().get(":all-time"); + Map<String, Long> ackedMap = stats.get_acked().get(":all-time"); + if (ackedMap != null) { + for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + acked += ackVal; + } + } + } + } + long ackedThisTime = acked - _prev_acked; + long thisTime = uptime - _prev_uptime; + long nnpct, nnnpct, min, max; + double mean, stddev; + synchronized(_histo) { + nnpct = _histo.getValueAtPercentile(99.0); + nnnpct = _histo.getValueAtPercentile(99.9); + min = _histo.getMinValue(); + max = _histo.getMaxValue(); + mean = _histo.getMean(); + stddev = _histo.getStdDeviation(); + _histo.reset(); + } + long user = _userCPU.getAndSet(0); + long sys = _systemCPU.getAndSet(0); + long gc = _gcMs.getAndSet(0); + double memMB = readMemory() / (1024.0 * 1024.0); + System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " + + "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " + + "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n", + uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct, + min, max, mean, stddev, user, sys, gc, memMB); + _prev_uptime = uptime; + _prev_acked = acked; + } + + public static void kill(C client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); + } + + public static void main(String[] args) throws Exception { + long ratePerSecond = 500; + if (args != null && args.length > 0) { + ratePerSecond = Long.valueOf(args[0]); + } + + int parallelism = 4; + if (args != null && args.length > 1) { + parallelism = Integer.valueOf(args[1]); + } + + int numMins = 5; + if (args != null && args.length > 2) { + numMins = Integer.valueOf(args[2]); + } + + String name = "wc-test"; + if (args != null && args.length > 3) { + name = args[3]; + } + + Config conf = new Config(); + HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) { + @Override + public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { + String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort; + for (DataPoint dp: dataPoints) { + if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) { + synchronized(_histo) { + _histo.add((Histogram)dp.value); + } + } else if ("CPU".equals(dp.name) && dp.value instanceof Map) { + Map<Object, Object> m = (Map<Object, Object>)dp.value; + Object sys = m.get("sys-ms"); + if (sys instanceof Number) { + _systemCPU.getAndAdd(((Number)sys).longValue()); + } + Object user = m.get("user-ms"); + if (user instanceof Number) { + _userCPU.getAndAdd(((Number)user).longValue()); + } + } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) { + Map<Object, Object> m = (Map<Object, Object>)dp.value; + Object count = m.get("count"); + if (count instanceof Number) { + _gcCount.getAndAdd(((Number)count).longValue()); + } + Object time = m.get("timeMs"); + if (time instanceof Number) { + _gcMs.getAndAdd(((Number)time).longValue()); + } + } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) { + Map<Object, Object> m = (Map<Object, Object>)dp.value; + Object val = m.get("usedBytes"); + if (val instanceof Number) { + MemMeasure mm = _memoryBytes.get(worker); + if (mm == null) { + mm = new MemMeasure(); + MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm); + mm = tmp == null ? mm : tmp; + } + mm.update(((Number)val).longValue()); + } + } + } + } + }; + + metricServer.serve(); + String url = metricServer.getUrl(); + + C cluster = new C(conf); + conf.setNumWorkers(parallelism); + conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class); + conf.registerMetricsConsumer(org.apache.storm.metric.HttpForwardingMetricsConsumer.class, url, 1); + Map<String, String> workerMetrics = new HashMap<String, String>(); + if (!cluster.isLocal()) { + //sigar uses JNI and does not work in local mode + workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric"); + } + conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics); + conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10); + conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, + "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled"); + conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g"); + + TopologyBuilder builder = new TopologyBuilder(); + + int numEach = 4 * parallelism; + builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach); + + builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word")); + + try { + cluster.submitTopology(name, conf, builder.createTopology()); + + for (int i = 0; i < numMins * 2; i++) { + Thread.sleep(30 * 1000); + printMetrics(cluster, name); + } + } finally { + kill(cluster, name); + } + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java new file mode 100644 index 0000000..312f83e --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MemoryTransactionalSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBatchBolt; +import org.apache.storm.topology.base.BaseTransactionalBolt; +import org.apache.storm.transactional.ICommitter; +import org.apache.storm.transactional.TransactionAttempt; +import org.apache.storm.transactional.TransactionalTopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a + * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. + * + * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a> + */ +public class TransactionalGlobalCount { + public static final int PARTITION_TAKE_PER_BATCH = 3; + public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ + put(0, new ArrayList<List<Object>>() {{ + add(new Values("cat")); + add(new Values("dog")); + add(new Values("chicken")); + add(new Values("cat")); + add(new Values("dog")); + add(new Values("apple")); + }}); + put(1, new ArrayList<List<Object>>() {{ + add(new Values("cat")); + add(new Values("dog")); + add(new Values("apple")); + add(new Values("banana")); + }}); + put(2, new ArrayList<List<Object>>() {{ + add(new Values("cat")); + add(new Values("cat")); + add(new Values("cat")); + add(new Values("cat")); + add(new Values("cat")); + add(new Values("dog")); + add(new Values("dog")); + add(new Values("dog")); + add(new Values("dog")); + }}); + }}; + + public static class Value { + int count = 0; + BigInteger txid; + } + + public static Map<String, Value> DATABASE = new HashMap<String, Value>(); + public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; + + public static class BatchCount extends BaseBatchBolt { + Object _id; + BatchOutputCollector _collector; + + int _count = 0; + + @Override + public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { + _collector = collector; + _id = id; + } + + @Override + public void execute(Tuple tuple) { + _count++; + } + + @Override + public void finishBatch() { + _collector.emit(new Values(_id, _count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "count")); + } + } + + public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter { + TransactionAttempt _attempt; + BatchOutputCollector _collector; + + int _sum = 0; + + @Override + public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { + _collector = collector; + _attempt = attempt; + } + + @Override + public void execute(Tuple tuple) { + _sum += tuple.getInteger(1); + } + + @Override + public void finishBatch() { + Value val = DATABASE.get(GLOBAL_COUNT_KEY); + Value newval; + if (val == null || !val.txid.equals(_attempt.getTransactionId())) { + newval = new Value(); + newval.txid = _attempt.getTransactionId(); + if (val == null) { + newval.count = _sum; + } + else { + newval.count = _sum + val.count; + } + DATABASE.put(GLOBAL_COUNT_KEY, newval); + } + else { + newval = val; + } + _collector.emit(new Values(_attempt, newval.count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "sum")); + } + } + + public static void main(String[] args) throws Exception { + MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); + TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); + builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); + builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); + + LocalCluster cluster = new LocalCluster(); + + Config config = new Config(); + config.setDebug(true); + config.setMaxSpoutPending(3); + + cluster.submitTopology("global-count-topology", config, builder.buildTopology()); + + Thread.sleep(3000); + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java new file mode 100644 index 0000000..64689b0 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MemoryTransactionalSpout; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseTransactionalBolt; +import org.apache.storm.transactional.ICommitter; +import org.apache.storm.transactional.TransactionAttempt; +import org.apache.storm.transactional.TransactionalTopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a + * stream of words and produces two outputs: + * <p/> + * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in + * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on. + * <p/> + * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move + * between buckets as their counts accumulate. + */ +public class TransactionalWords { + public static class CountValue { + Integer prev_count = null; + int count = 0; + BigInteger txid = null; + } + + public static class BucketValue { + int count = 0; + BigInteger txid; + } + + public static final int BUCKET_SIZE = 10; + + public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>(); + public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>(); + + + public static final int PARTITION_TAKE_PER_BATCH = 3; + + public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ + put(0, new ArrayList<List<Object>>() {{ + add(new Values("cat")); + add(new Values("dog")); + add(new Values("chicken")); + add(new Values("cat")); + add(new Values("dog")); + add(new Values("apple")); + }}); + put(1, new ArrayList<List<Object>>() {{ + add(new Values("cat")); + add(new Values("dog")); + add(new Values("apple")); + add(new Values("banana")); + }}); + put(2, new ArrayList<List<Object>>() {{ + add(new Values("cat")); + add(new Values("cat")); + add(new Values("cat")); + add(new Values("cat")); + add(new Values("cat")); + add(new Values("dog")); + add(new Values("dog")); + add(new Values("dog")); + add(new Values("dog")); + }}); + }}; + + public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter { + Map<String, Integer> _counts = new HashMap<String, Integer>(); + BatchOutputCollector _collector; + TransactionAttempt _id; + + int _count = 0; + + @Override + public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { + _collector = collector; + _id = id; + } + + @Override + public void execute(Tuple tuple) { + String key = tuple.getString(1); + Integer curr = _counts.get(key); + if (curr == null) + curr = 0; + _counts.put(key, curr + 1); + } + + @Override + public void finishBatch() { + for (String key : _counts.keySet()) { + CountValue val = COUNT_DATABASE.get(key); + CountValue newVal; + if (val == null || !val.txid.equals(_id)) { + newVal = new CountValue(); + newVal.txid = _id.getTransactionId(); + if (val != null) { + newVal.prev_count = val.count; + newVal.count = val.count; + } + newVal.count = newVal.count + _counts.get(key); + COUNT_DATABASE.put(key, newVal); + } + else { + newVal = val; + } + _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "key", "count", "prev-count")); + } + } + + public static class Bucketize extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); + int curr = tuple.getInteger(2); + Integer prev = tuple.getInteger(3); + + int currBucket = curr / BUCKET_SIZE; + Integer prevBucket = null; + if (prev != null) { + prevBucket = prev / BUCKET_SIZE; + } + + if (prevBucket == null) { + collector.emit(new Values(attempt, currBucket, 1)); + } + else if (currBucket != prevBucket) { + collector.emit(new Values(attempt, currBucket, 1)); + collector.emit(new Values(attempt, prevBucket, -1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("attempt", "bucket", "delta")); + } + } + + public static class BucketCountUpdater extends BaseTransactionalBolt { + Map<Integer, Integer> _accum = new HashMap<Integer, Integer>(); + BatchOutputCollector _collector; + TransactionAttempt _attempt; + + int _count = 0; + + @Override + public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { + _collector = collector; + _attempt = attempt; + } + + @Override + public void execute(Tuple tuple) { + Integer bucket = tuple.getInteger(1); + Integer delta = tuple.getInteger(2); + Integer curr = _accum.get(bucket); + if (curr == null) + curr = 0; + _accum.put(bucket, curr + delta); + } + + @Override + public void finishBatch() { + for (Integer bucket : _accum.keySet()) { + BucketValue currVal = BUCKET_DATABASE.get(bucket); + BucketValue newVal; + if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) { + newVal = new BucketValue(); + newVal.txid = _attempt.getTransactionId(); + newVal.count = _accum.get(bucket); + if (currVal != null) + newVal.count += currVal.count; + BUCKET_DATABASE.put(bucket, newVal); + } + else { + newVal = currVal; + } + _collector.emit(new Values(_attempt, bucket, newVal.count)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("id", "bucket", "count")); + } + } + + public static void main(String[] args) throws Exception { + MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); + TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); + builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word")); + builder.setBolt("bucketize", new Bucketize()).noneGrouping("count"); + builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket")); + + + LocalCluster cluster = new LocalCluster(); + + Config config = new Config(); + config.setDebug(true); + config.setMaxSpoutPending(3); + + cluster.submitTopology("top-n-topology", config, builder.buildTopology()); + + Thread.sleep(3000); + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java new file mode 100644 index 0000000..e4a5711 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.task.ShellBolt; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.starter.spout.RandomSentenceSpout; + +import java.util.HashMap; +import java.util.Map; + +/** + * This topology demonstrates Storm's stream groupings and multilang capabilities. + */ +public class WordCountTopology { + public static class SplitSentence extends ShellBolt implements IRichBolt { + + public SplitSentence() { + super("python", "splitsentence.py"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + } + + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentenceSpout(), 5); + + builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setDebug(true); + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } + else { + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, builder.createTopology()); + + Thread.sleep(10000); + + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java new file mode 100644 index 0000000..431b9d8 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.spout.ShellSpout; +import org.apache.storm.task.ShellBolt; +import org.apache.storm.topology.*; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * This topology demonstrates Storm's stream groupings and multilang capabilities. + */ +public class WordCountTopologyNode { + public static class SplitSentence extends ShellBolt implements IRichBolt { + + public SplitSentence() { + super("node", "splitsentence.js"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + } + + public static class RandomSentence extends ShellSpout implements IRichSpout { + + public RandomSentence() { + super("node", "randomsentence.js"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + } + + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentence(), 5); + + builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setDebug(true); + + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } + else { + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, builder.createTopology()); + + Thread.sleep(10000); + + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java new file mode 100644 index 0000000..9cf9e79 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; +import org.apache.log4j.Logger; +import org.apache.storm.starter.tools.Rankings; + +import java.util.HashMap; +import java.util.Map; + +/** + * This abstract bolt provides the basic behavior of bolts that rank objects according to their count. + * <p/> + * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow + * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those + * tuples are retrieved and counted. + */ +public abstract class AbstractRankerBolt extends BaseBasicBolt { + + private static final long serialVersionUID = 4931640198501530202L; + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; + private static final int DEFAULT_COUNT = 10; + + private final int emitFrequencyInSeconds; + private final int count; + private final Rankings rankings; + + public AbstractRankerBolt() { + this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + public AbstractRankerBolt(int topN) { + this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); + } + + public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { + if (topN < 1) { + throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")"); + } + if (emitFrequencyInSeconds < 1) { + throw new IllegalArgumentException( + "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)"); + } + count = topN; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + rankings = new Rankings(count); + } + + protected Rankings getRankings() { + return rankings; + } + + /** + * This method functions as a template method (design pattern). + */ + @Override + public final void execute(Tuple tuple, BasicOutputCollector collector) { + if (TupleUtils.isTick(tuple)) { + getLogger().debug("Received tick tuple, triggering emit of current rankings"); + emitRankings(collector); + } + else { + updateRankingsWithTuple(tuple); + } + } + + abstract void updateRankingsWithTuple(Tuple tuple); + + private void emitRankings(BasicOutputCollector collector) { + collector.emit(new Values(rankings.copy())); + getLogger().debug("Rankings: " + rankings); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("rankings")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } + + abstract Logger getLogger(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java new file mode 100644 index 0000000..6950bfb --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.tuple.Tuple; +import org.apache.log4j.Logger; +import org.apache.storm.starter.tools.Rankable; +import org.apache.storm.starter.tools.RankableObjectWithFields; + +/** + * This bolt ranks incoming objects by their count. + * <p/> + * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1, + * additionalField2, ..., additionalFieldN). + */ +public final class IntermediateRankingsBolt extends AbstractRankerBolt { + + private static final long serialVersionUID = -1369800530256637409L; + private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class); + + public IntermediateRankingsBolt() { + super(); + } + + public IntermediateRankingsBolt(int topN) { + super(topN); + } + + public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { + super(topN, emitFrequencyInSeconds); + } + + @Override + void updateRankingsWithTuple(Tuple tuple) { + Rankable rankable = RankableObjectWithFields.from(tuple); + super.getRankings().updateWith(rankable); + } + + @Override + Logger getLogger() { + return LOG; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java new file mode 100644 index 0000000..993a937 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + + +public class PrinterBolt extends BaseBasicBolt { + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + System.out.println(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) { + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java new file mode 100644 index 0000000..45300de --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/RollingCountAggBolt.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.log4j.Logger; +import org.apache.storm.starter.tools.NthLastModifiedTimeTracker; +import org.apache.storm.starter.tools.SlidingWindowCounter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This bolt aggregates counts from multiple upstream bolts. + */ +public class RollingCountAggBolt extends BaseRichBolt { + private static final long serialVersionUID = 5537727428628598519L; + private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class); + //Mapping of key->upstreamBolt->count + private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>(); + private OutputCollector collector; + + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + Object obj = tuple.getValue(0); + long count = tuple.getLong(1); + int source = tuple.getSourceTask(); + Map<Integer, Long> subCounts = counts.get(obj); + if (subCounts == null) { + subCounts = new HashMap<Integer, Long>(); + counts.put(obj, subCounts); + } + //Update the current count for this object + subCounts.put(source, count); + //Output the sum of all the known counts so for this key + long sum = 0; + for (Long val: subCounts.values()) { + sum += val; + } + collector.emit(new Values(obj, sum)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count")); + } +}
