http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java index 4da1c85..1e57a94 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReachTopology.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.Arrays; @@ -23,7 +18,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.coordination.BatchOutputCollector; @@ -71,6 +65,34 @@ public class ReachTopology { put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); }}; + public static LinearDRPCTopologyBuilder construct() { + LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); + builder.addBolt(new GetTweeters(), 4); + builder.addBolt(new GetFollowers(), 12).shuffleGrouping(); + builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower")); + builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id")); + return builder; + } + + public static void main(String[] args) throws Exception { + LinearDRPCTopologyBuilder builder = construct(); + + Config conf = new Config(); + conf.setNumWorkers(6); + String topoName = "reach-drpc"; + if (args.length > 0) { + topoName = args[0]; + } + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createRemoteTopology()); + + try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) { + String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" }; + for (String url : urlsToTry) { + System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); + } + } + } + public static class GetTweeters extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { @@ -162,32 +184,4 @@ public class ReachTopology { declarer.declare(new Fields("id", "reach")); } } - - public static LinearDRPCTopologyBuilder construct() { - LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); - builder.addBolt(new GetTweeters(), 4); - builder.addBolt(new GetFollowers(), 12).shuffleGrouping(); - builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower")); - builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id")); - return builder; - } - - public static void main(String[] args) throws Exception { - LinearDRPCTopologyBuilder builder = construct(); - - Config conf = new Config(); - conf.setNumWorkers(6); - String topoName = "reach-drpc"; - if (args.length > 0) { - topoName = args[0]; - } - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createRemoteTopology()); - - try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) { - String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" }; - for (String url : urlsToTry) { - System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); - } - } - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java index 07149e8..18af7ea 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.starter; @@ -22,7 +16,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.task.OutputCollector; @@ -39,53 +32,6 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class ResourceAwareExampleTopology { - public static class ExclamationBolt extends BaseRichBolt { - //Have a crummy cache to show off shared memory accounting - private static final ConcurrentHashMap<String, String> myCrummyCache = - new ConcurrentHashMap<>(); - private static final int CACHE_SIZE = 100_000; - OutputCollector _collector; - - protected static String getFromCache(String key) { - return myCrummyCache.get(key); - } - - protected static void addToCache(String key, String value) { - myCrummyCache.putIfAbsent(key, value); - int numToRemove = myCrummyCache.size() - CACHE_SIZE; - if (numToRemove > 0) { - //Remove something randomly... - Iterator<Entry<String, String>> it = myCrummyCache.entrySet().iterator(); - for (; numToRemove > 0 && it.hasNext(); numToRemove--) { - it.next(); - it.remove(); - } - } - } - - @Override - public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - _collector = collector; - } - - @Override - public void execute(Tuple tuple) { - String orig = tuple.getString(0); - String ret = getFromCache(orig); - if (ret == null) { - ret = orig + "!!!"; - addToCache(orig, ret); - } - _collector.emit(tuple, new Values(ret)); - _collector.ack(tuple); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); @@ -154,4 +100,51 @@ public class ResourceAwareExampleTopology { StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); } + + public static class ExclamationBolt extends BaseRichBolt { + //Have a crummy cache to show off shared memory accounting + private static final ConcurrentHashMap<String, String> myCrummyCache = + new ConcurrentHashMap<>(); + private static final int CACHE_SIZE = 100_000; + OutputCollector _collector; + + protected static String getFromCache(String key) { + return myCrummyCache.get(key); + } + + protected static void addToCache(String key, String value) { + myCrummyCache.putIfAbsent(key, value); + int numToRemove = myCrummyCache.size() - CACHE_SIZE; + if (numToRemove > 0) { + //Remove something randomly... + Iterator<Entry<String, String>> it = myCrummyCache.entrySet().iterator(); + for (; numToRemove > 0 && it.hasNext(); numToRemove--) { + it.next(); + it.remove(); + } + } + } + + @Override + public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { + _collector = collector; + } + + @Override + public void execute(Tuple tuple) { + String orig = tuple.getString(0); + String ret = getFromCache(orig); + if (ret == null) { + ret = orig + "!!!"; + addToCache(orig, ret); + } + _collector.emit(tuple, new Values(ret)); + _collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java index 78b2baf..23f56c2 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import org.apache.storm.starter.bolt.IntermediateRankingsBolt; @@ -35,54 +30,54 @@ import org.slf4j.LoggerFactory; */ public class RollingTopWords extends ConfigurableTopology { - private static final Logger LOG = LoggerFactory.getLogger(RollingTopWords.class); - private static final int TOP_N = 5; - - private RollingTopWords() { - } + private static final Logger LOG = LoggerFactory.getLogger(RollingTopWords.class); + private static final int TOP_N = 5; - public static void main(String[] args) throws Exception { - ConfigurableTopology.start(new RollingTopWords(), args); - } + private RollingTopWords() { + } - /** - * Submits (runs) the topology. - * - * Usage: "RollingTopWords [topology-name] [-local]" - * - * By default, the topology is run locally under the name - * "slidingWindowCounts". - * - * Examples: - * - * ``` - * - * # Runs in remote/cluster mode, with topology name "production-topology" - * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology ``` - * - * @param args - * First positional argument (optional) is topology name, second - * positional argument (optional) defines whether to run the topology - * locally ("-local") or remotely, i.e. on a real cluster. - * @throws Exception - */ - protected int run(String[] args) { - String topologyName = "slidingWindowCounts"; - if (args.length >= 1) { - topologyName = args[0]; + public static void main(String[] args) throws Exception { + ConfigurableTopology.start(new RollingTopWords(), args); } - TopologyBuilder builder = new TopologyBuilder(); - String spoutId = "wordGenerator"; - String counterId = "counter"; - String intermediateRankerId = "intermediateRanker"; - String totalRankerId = "finalRanker"; - builder.setSpout(spoutId, new TestWordSpout(), 5); - builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); - builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, - new Fields("obj")); - builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); - LOG.info("Topology name: " + topologyName); - return submit(topologyName, conf, builder); - } + /** + * Submits (runs) the topology. + * + * Usage: "RollingTopWords [topology-name] [-local]" + * + * By default, the topology is run locally under the name + * "slidingWindowCounts". + * + * Examples: + * + * ``` + * + * # Runs in remote/cluster mode, with topology name "production-topology" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords production-topology ``` + * + * @param args + * First positional argument (optional) is topology name, second + * positional argument (optional) defines whether to run the topology + * locally ("-local") or remotely, i.e. on a real cluster. + * @throws Exception + */ + protected int run(String[] args) { + String topologyName = "slidingWindowCounts"; + if (args.length >= 1) { + topologyName = args[0]; + } + TopologyBuilder builder = new TopologyBuilder(); + String spoutId = "wordGenerator"; + String counterId = "counter"; + String intermediateRankerId = "intermediateRanker"; + String totalRankerId = "finalRanker"; + builder.setSpout(spoutId, new TestWordSpout(), 5); + builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); + builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, + new Fields("obj")); + builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); + LOG.info("Topology name: " + topologyName); + + return submit(topologyName, conf, builder); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java index 2e1bb94..4f4b6b6 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SingleJoinExample.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import org.apache.storm.Config; @@ -34,7 +29,7 @@ public class SingleJoinExample { public static void main(String[] args) throws Exception { if (!NimbusClient.isLocalOverride()) { throw new IllegalStateException("This example only works in local mode. " - + "Run with storm local not storm jar"); + + "Run with storm local not storm jar"); } FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); @@ -43,7 +38,7 @@ public class SingleJoinExample { builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout); builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) - .fieldsGrouping("age", new Fields("id")); + .fieldsGrouping("age", new Fields("id")); Config conf = new Config(); conf.setDebug(true); @@ -53,8 +48,7 @@ public class SingleJoinExample { String gender; if (i % 2 == 0) { gender = "male"; - } - else { + } else { gender = "female"; } genderSpout.feed(new Values(i, gender)); http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java index 4e18217..a3b8296 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import org.apache.storm.starter.bolt.IntermediateRankingsBolt; @@ -38,55 +33,55 @@ import org.slf4j.LoggerFactory; */ public class SkewedRollingTopWords extends ConfigurableTopology { - private static final Logger LOG = LoggerFactory.getLogger(SkewedRollingTopWords.class); - private static final int TOP_N = 5; - - private SkewedRollingTopWords() { - } + private static final Logger LOG = LoggerFactory.getLogger(SkewedRollingTopWords.class); + private static final int TOP_N = 5; - public static void main(String[] args) throws Exception { - ConfigurableTopology.start(new SkewedRollingTopWords(), args); - } + private SkewedRollingTopWords() { + } - /** - * Submits (runs) the topology. - * - * Usage: "SkewedRollingTopWords [topology-name] [-local]" - * - * By default, the topology is run locally under the name - * "slidingWindowCounts". - * - * Examples: - * - * ``` - * - * # Runs in remote/cluster mode, with topology name "production-topology" - * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords production-topology ``` - * - * @param args - * First positional argument (optional) is topology name, second - * positional argument (optional) defines whether to run the topology - * locally ("-local") or remotely, i.e. on a real cluster. - * @throws Exception - */ - protected int run(String[] args) { - String topologyName = "slidingWindowCounts"; - if (args.length >= 1) { - topologyName = args[0]; + public static void main(String[] args) throws Exception { + ConfigurableTopology.start(new SkewedRollingTopWords(), args); } - TopologyBuilder builder = new TopologyBuilder(); - String spoutId = "wordGenerator"; - String counterId = "counter"; - String aggId = "aggregator"; - String intermediateRankerId = "intermediateRanker"; - String totalRankerId = "finalRanker"; - builder.setSpout(spoutId, new TestWordSpout(), 5); - builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word")); - builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj")); - builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj")); - builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); - LOG.info("Topology name: " + topologyName); - return submit(topologyName, conf, builder); - } + /** + * Submits (runs) the topology. + * + * Usage: "SkewedRollingTopWords [topology-name] [-local]" + * + * By default, the topology is run locally under the name + * "slidingWindowCounts". + * + * Examples: + * + * ``` + * + * # Runs in remote/cluster mode, with topology name "production-topology" + * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords production-topology ``` + * + * @param args + * First positional argument (optional) is topology name, second + * positional argument (optional) defines whether to run the topology + * locally ("-local") or remotely, i.e. on a real cluster. + * @throws Exception + */ + protected int run(String[] args) { + String topologyName = "slidingWindowCounts"; + if (args.length >= 1) { + topologyName = args[0]; + } + TopologyBuilder builder = new TopologyBuilder(); + String spoutId = "wordGenerator"; + String counterId = "counter"; + String aggId = "aggregator"; + String intermediateRankerId = "intermediateRanker"; + String totalRankerId = "finalRanker"; + builder.setSpout(spoutId, new TestWordSpout(), 5); + builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word")); + builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj")); + builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj")); + builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); + LOG.info("Topology name: " + topologyName); + + return submit(topologyName, conf, builder); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java index 6204f8c..d179899 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingTupleTsTopology.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.concurrent.TimeUnit; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.starter.bolt.PrinterBolt; @@ -36,20 +30,20 @@ public class SlidingTupleTsTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); BaseWindowedBolt bolt = new SlidingWindowSumBolt() - .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)) - .withTimestampField("ts") - .withLag(new Duration(5, TimeUnit.SECONDS)); + .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)) + .withTimestampField("ts") + .withLag(new Duration(5, TimeUnit.SECONDS)); builder.setSpout("integer", new RandomIntegerSpout(), 1); builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer"); builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum"); Config conf = new Config(); conf.setDebug(true); String topoName = "test"; - + if (args != null && args.length > 0) { topoName = args[0]; } - + conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java index fc7cf4a..a75f999 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java @@ -1,26 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.starter; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.starter.bolt.PrinterBolt; @@ -47,6 +40,24 @@ public class SlidingWindowTopology { private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class); + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("integer", new RandomIntegerSpout(), 1); + builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1) + .shuffleGrouping("integer"); + builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(Count.of(3)), 1) + .shuffleGrouping("slidingsum"); + builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg"); + Config conf = new Config(); + conf.setDebug(true); + String topoName = "test"; + if (args != null && args.length > 0) { + topoName = args[0]; + } + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); + } + /* * Computes tumbling window average */ @@ -65,9 +76,9 @@ public class SlidingWindowTopology { LOG.debug("Events in current window: " + tuplesInWindow.size()); if (tuplesInWindow.size() > 0) { /* - * Since this is a tumbling window calculation, - * we use all the tuples in the window to compute the avg. - */ + * Since this is a tumbling window calculation, + * we use all the tuples in the window to compute the avg. + */ for (Tuple tuple : tuplesInWindow) { sum += (int) tuple.getValue(0); } @@ -80,23 +91,4 @@ public class SlidingWindowTopology { declarer.declare(new Fields("avg")); } } - - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("integer", new RandomIntegerSpout(), 1); - builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1) - .shuffleGrouping("integer"); - builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(Count.of(3)), 1) - .shuffleGrouping("slidingsum"); - builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg"); - Config conf = new Config(); - conf.setDebug(true); - String topoName = "test"; - if (args != null && args.length > 0) { - topoName = args[0]; - } - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java index e407ce8..3f30f48 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java @@ -19,7 +19,6 @@ package org.apache.storm.starter; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.starter.spout.RandomIntegerSpout; @@ -38,20 +37,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An example topology that demonstrates the use of {@link org.apache.storm.topology.IStatefulBolt} - * to manage state. To run the example, + * An example topology that demonstrates the use of {@link org.apache.storm.topology.IStatefulBolt} to manage state. To run the example, * <pre> * $ storm jar examples/storm-starter/storm-starter-topologies-*.jar storm.starter.StatefulTopology statetopology * </pre> * <p/> - * The default state used is 'InMemoryKeyValueState' which does not persist the state across restarts. You could use - * 'RedisKeyValueState' to test state persistence by setting below property in conf/storm.yaml + * The default state used is 'InMemoryKeyValueState' which does not persist the state across restarts. You could use 'RedisKeyValueState' to + * test state persistence by setting below property in conf/storm.yaml * <pre> * topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider * </pre> * <p/> - * You should also start a local redis instance before running the 'storm jar' command. The default - * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g. + * You should also start a local redis instance before running the 'storm jar' command. The default RedisKeyValueStateProvider parameters + * can be overridden in conf/storm.yaml, for e.g. * <p/> * <pre> * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...", @@ -65,6 +63,22 @@ import org.slf4j.LoggerFactory; public class StatefulTopology { private static final Logger LOG = LoggerFactory.getLogger(StatefulTopology.class); + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new RandomIntegerSpout()); + builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout"); + builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum"); + builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer"); + Config conf = new Config(); + conf.setDebug(false); + String topoName = "test"; + if (args != null && args.length > 0) { + topoName = args[0]; + } + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); + } + /** * A bolt that uses {@link KeyValueState} to save its state. */ @@ -119,20 +133,4 @@ public class StatefulTopology { } } - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomIntegerSpout()); - builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout"); - builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum"); - builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer"); - Config conf = new Config(); - conf.setDebug(false); - String topoName = "test"; - if (args != null && args.length > 0) { - topoName = args[0]; - } - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java index 30c1ab2..eb0132e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java @@ -19,7 +19,6 @@ package org.apache.storm.starter; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.starter.bolt.PrinterBolt; @@ -40,18 +39,37 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to - * save the state of the windowing operation to avoid re-computation in case of failures. + * A simple example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} to save the state of the + * windowing operation to avoid re-computation in case of failures. * <p> * The framework internally manages the window boundaries and does not invoke - * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} for the already evaluated windows in case of restarts - * during failures. The {@link org.apache.storm.topology.IStatefulBolt#initState(State)} + * {@link org.apache.storm.topology.IWindowedBolt#execute(TupleWindow)} + * for the already evaluated windows in case of restarts during failures. The + * {@link org.apache.storm.topology.IStatefulBolt#initState(State)} * is invoked with the previously saved state of the bolt after prepare, before the execute() method is invoked. * </p> */ public class StatefulWindowingTopology { private static final Logger LOG = LoggerFactory.getLogger(StatefulWindowingTopology.class); + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new RandomIntegerSpout()); + builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3)) + .withMessageIdField("msgid"), 1).shuffleGrouping("spout"); + builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt"); + Config conf = new Config(); + conf.setDebug(false); + //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider"); + + String topoName = "test"; + if (args != null && args.length > 0) { + topoName = args[0]; + } + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); + } + private static class WindowSumBolt extends BaseStatefulWindowedBolt<KeyValueState<String, Long>> { private KeyValueState<String, Long> state; private long sum; @@ -85,22 +103,4 @@ public class StatefulWindowingTopology { } } - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomIntegerSpout()); - builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3)) - .withMessageIdField("msgid"), 1).shuffleGrouping("spout"); - builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt"); - Config conf = new Config(); - conf.setDebug(false); - //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider"); - - String topoName = "test"; - if (args != null && args.length > 0) { - topoName = args[0]; - } - conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java index 363fc2c..45cd68d 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.math.BigInteger; @@ -22,7 +17,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.coordination.BatchOutputCollector; @@ -74,15 +68,31 @@ public class TransactionalGlobalCount { add(new Values("dog")); }}); }}; + public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; + public static Map<String, Value> DATABASE = new HashMap<String, Value>(); + + public static void main(String[] args) throws Exception { + if (!NimbusClient.isLocalOverride()) { + throw new IllegalStateException("This example only works in local mode. " + + "Run with storm local not storm jar"); + } + MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); + TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); + builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); + builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); + + Config config = new Config(); + config.setDebug(true); + config.setMaxSpoutPending(3); + + StormSubmitter.submitTopology("global-count-topology", config, builder.buildTopology()); + } public static class Value { int count = 0; BigInteger txid; } - public static Map<String, Value> DATABASE = new HashMap<String, Value>(); - public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; - public static class BatchCount extends BaseBatchBolt<Object> { Object _id; BatchOutputCollector _collector; @@ -137,13 +147,11 @@ public class TransactionalGlobalCount { newval.txid = _attempt.getTransactionId(); if (val == null) { newval.count = _sum; - } - else { + } else { newval.count = _sum + val.count; } DATABASE.put(GLOBAL_COUNT_KEY, newval); - } - else { + } else { newval = val; } _collector.emit(new Values(_attempt, newval.count)); @@ -154,21 +162,4 @@ public class TransactionalGlobalCount { declarer.declare(new Fields("id", "sum")); } } - - public static void main(String[] args) throws Exception { - if (!NimbusClient.isLocalOverride()) { - throw new IllegalStateException("This example only works in local mode. " - + "Run with storm local not storm jar"); - } - MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); - TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); - builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); - builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); - - Config config = new Config(); - config.setDebug(true); - config.setMaxSpoutPending(3); - - StormSubmitter.submitTopology("global-count-topology", config, builder.buildTopology()); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java index 7e343af..7875360 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.math.BigInteger; @@ -22,7 +17,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.coordination.BatchOutputCollector; @@ -51,25 +45,8 @@ import org.apache.storm.utils.NimbusClient; * between buckets as their counts accumulate. */ public class TransactionalWords { - public static class CountValue { - Integer prev_count = null; - int count = 0; - BigInteger txid = null; - } - - public static class BucketValue { - int count = 0; - BigInteger txid; - } - public static final int BUCKET_SIZE = 10; - - public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>(); - public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>(); - - public static final int PARTITION_TAKE_PER_BATCH = 3; - public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ put(0, new ArrayList<List<Object>>() {{ add(new Values("cat")); @@ -97,6 +74,36 @@ public class TransactionalWords { add(new Values("dog")); }}); }}; + public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>(); + public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>(); + + public static void main(String[] args) throws Exception { + if (!NimbusClient.isLocalOverride()) { + throw new IllegalStateException("This example only works in local mode. " + + "Run with storm local not storm jar"); + } + MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); + TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); + builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word")); + builder.setBolt("bucketize", new Bucketize()).noneGrouping("count"); + builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket")); + Config config = new Config(); + config.setDebug(true); + config.setMaxSpoutPending(3); + + StormSubmitter.submitTopology("top-n-topology", config, builder.buildTopology()); + } + + public static class CountValue { + Integer prev_count = null; + int count = 0; + BigInteger txid = null; + } + + public static class BucketValue { + int count = 0; + BigInteger txid; + } public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter { Map<String, Integer> _counts = new HashMap<String, Integer>(); @@ -115,8 +122,9 @@ public class TransactionalWords { public void execute(Tuple tuple) { String key = tuple.getString(1); Integer curr = _counts.get(key); - if (curr == null) + if (curr == null) { curr = 0; + } _counts.put(key, curr + 1); } @@ -134,8 +142,7 @@ public class TransactionalWords { } newVal.count = newVal.count + _counts.get(key); COUNT_DATABASE.put(key, newVal); - } - else { + } else { newVal = val; } _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count)); @@ -163,8 +170,7 @@ public class TransactionalWords { if (prevBucket == null) { collector.emit(new Values(attempt, currBucket, 1)); - } - else if (currBucket != prevBucket) { + } else if (currBucket != prevBucket) { collector.emit(new Values(attempt, currBucket, 1)); collector.emit(new Values(attempt, prevBucket, -1)); } @@ -194,8 +200,9 @@ public class TransactionalWords { Integer bucket = tuple.getInteger(1); Integer delta = tuple.getInteger(2); Integer curr = _accum.get(bucket); - if (curr == null) + if (curr == null) { curr = 0; + } _accum.put(bucket, curr + delta); } @@ -208,11 +215,11 @@ public class TransactionalWords { newVal = new BucketValue(); newVal.txid = _attempt.getTransactionId(); newVal.count = _accum.get(bucket); - if (currVal != null) + if (currVal != null) { newVal.count += currVal.count; + } BUCKET_DATABASE.put(bucket, newVal); - } - else { + } else { newVal = currVal; } _collector.emit(new Values(_attempt, bucket, newVal.count)); @@ -224,21 +231,4 @@ public class TransactionalWords { declarer.declare(new Fields("id", "bucket", "count")); } } - - public static void main(String[] args) throws Exception { - if (!NimbusClient.isLocalOverride()) { - throw new IllegalStateException("This example only works in local mode. " - + "Run with storm local not storm jar"); - } - MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); - TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); - builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word")); - builder.setBolt("bucketize", new Bucketize()).noneGrouping("count"); - builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket")); - Config config = new Config(); - config.setDebug(true); - config.setMaxSpoutPending(3); - - StormSubmitter.submitTopology("top-n-topology", config, builder.buildTopology()); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java index 59e3e4b..32ec822 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.HashMap; import java.util.Map; - import org.apache.storm.starter.spout.RandomSentenceSpout; import org.apache.storm.task.ShellBolt; import org.apache.storm.topology.BasicOutputCollector; @@ -37,65 +31,66 @@ import org.apache.storm.tuple.Values; * capabilities. */ public class WordCountTopology extends ConfigurableTopology { - public static class SplitSentence extends ShellBolt implements IRichBolt { - - public SplitSentence() { - super("python", "splitsentence.py"); + public static void main(String[] args) throws Exception { + ConfigurableTopology.start(new WordCountTopology(), args); } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } + protected int run(String[] args) throws Exception { - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } + TopologyBuilder builder = new TopologyBuilder(); - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } + builder.setSpout("spout", new RandomSentenceSpout(), 5); - public static void main(String[] args) throws Exception { - ConfigurableTopology.start(new WordCountTopology(), args); - } + builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); - protected int run(String[] args) throws Exception { + conf.setDebug(true); - TopologyBuilder builder = new TopologyBuilder(); + String topologyName = "word-count"; - builder.setSpout("spout", new RandomSentenceSpout(), 5); + conf.setNumWorkers(3); - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); + if (args != null && args.length > 0) { + topologyName = args[0]; + } + return submit(topologyName, conf, builder); + } - conf.setDebug(true); + public static class SplitSentence extends ShellBolt implements IRichBolt { - String topologyName = "word-count"; + public SplitSentence() { + super("python", "splitsentence.py"); + } - conf.setNumWorkers(3); + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + } - if (args != null && args.length > 0) { - topologyName = args[0]; + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } } - return submit(topologyName, conf, builder); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java index 39f5932..6df7678 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter; import java.util.HashMap; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.spout.ShellSpout; @@ -38,6 +32,25 @@ import org.apache.storm.tuple.Values; * This topology demonstrates Storm's stream groupings and multilang capabilities. */ public class WordCountTopologyNode { + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentence(), 5); + + builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setDebug(true); + String topoName = "word-count"; + if (args != null && args.length > 0) { + topoName = args[0]; + } + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); + } + public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { @@ -79,8 +92,9 @@ public class WordCountTopologyNode { public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); - if (count == null) + if (count == null) { count = 0; + } count++; counts.put(word, count); collector.emit(new Values(word, count)); @@ -91,23 +105,4 @@ public class WordCountTopologyNode { declarer.declare(new Fields("word", "count")); } } - - public static void main(String[] args) throws Exception { - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new RandomSentence(), 5); - - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.setDebug(true); - String topoName = "word-count"; - if (args != null && args.length > 0) { - topoName = args[0]; - } - conf.setNumWorkers(3); - StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology()); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java index 9cf9e79..b04196c 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/AbstractRankerBolt.java @@ -1,23 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.bolt; +import java.util.HashMap; +import java.util.Map; +import org.apache.log4j.Logger; import org.apache.storm.Config; +import org.apache.storm.starter.tools.Rankings; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; @@ -25,11 +24,6 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils; -import org.apache.log4j.Logger; -import org.apache.storm.starter.tools.Rankings; - -import java.util.HashMap; -import java.util.Map; /** * This abstract bolt provides the basic behavior of bolts that rank objects according to their count. @@ -40,71 +34,70 @@ import java.util.Map; */ public abstract class AbstractRankerBolt extends BaseBasicBolt { - private static final long serialVersionUID = 4931640198501530202L; - private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; - private static final int DEFAULT_COUNT = 10; - - private final int emitFrequencyInSeconds; - private final int count; - private final Rankings rankings; - - public AbstractRankerBolt() { - this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); - } + private static final long serialVersionUID = 4931640198501530202L; + private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; + private static final int DEFAULT_COUNT = 10; - public AbstractRankerBolt(int topN) { - this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); - } + private final int emitFrequencyInSeconds; + private final int count; + private final Rankings rankings; - public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { - if (topN < 1) { - throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")"); + public AbstractRankerBolt() { + this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } - if (emitFrequencyInSeconds < 1) { - throw new IllegalArgumentException( - "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)"); + + public AbstractRankerBolt(int topN) { + this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } - count = topN; - this.emitFrequencyInSeconds = emitFrequencyInSeconds; - rankings = new Rankings(count); - } - protected Rankings getRankings() { - return rankings; - } + public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { + if (topN < 1) { + throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")"); + } + if (emitFrequencyInSeconds < 1) { + throw new IllegalArgumentException( + "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)"); + } + count = topN; + this.emitFrequencyInSeconds = emitFrequencyInSeconds; + rankings = new Rankings(count); + } - /** - * This method functions as a template method (design pattern). - */ - @Override - public final void execute(Tuple tuple, BasicOutputCollector collector) { - if (TupleUtils.isTick(tuple)) { - getLogger().debug("Received tick tuple, triggering emit of current rankings"); - emitRankings(collector); + protected Rankings getRankings() { + return rankings; } - else { - updateRankingsWithTuple(tuple); + + /** + * This method functions as a template method (design pattern). + */ + @Override + public final void execute(Tuple tuple, BasicOutputCollector collector) { + if (TupleUtils.isTick(tuple)) { + getLogger().debug("Received tick tuple, triggering emit of current rankings"); + emitRankings(collector); + } else { + updateRankingsWithTuple(tuple); + } } - } - abstract void updateRankingsWithTuple(Tuple tuple); + abstract void updateRankingsWithTuple(Tuple tuple); - private void emitRankings(BasicOutputCollector collector) { - collector.emit(new Values(rankings.copy())); - getLogger().debug("Rankings: " + rankings); - } + private void emitRankings(BasicOutputCollector collector) { + collector.emit(new Values(rankings.copy())); + getLogger().debug("Rankings: " + rankings); + } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("rankings")); - } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("rankings")); + } - @Override - public Map<String, Object> getComponentConfiguration() { - Map<String, Object> conf = new HashMap<String, Object>(); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); - return conf; - } + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); + return conf; + } - abstract Logger getLogger(); + abstract Logger getLogger(); } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java index 6950bfb..a6a8b49 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBolt.java @@ -1,26 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.bolt; -import org.apache.storm.tuple.Tuple; import org.apache.log4j.Logger; import org.apache.storm.starter.tools.Rankable; import org.apache.storm.starter.tools.RankableObjectWithFields; +import org.apache.storm.tuple.Tuple; /** * This bolt ranks incoming objects by their count. @@ -30,29 +25,29 @@ import org.apache.storm.starter.tools.RankableObjectWithFields; */ public final class IntermediateRankingsBolt extends AbstractRankerBolt { - private static final long serialVersionUID = -1369800530256637409L; - private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class); + private static final long serialVersionUID = -1369800530256637409L; + private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class); - public IntermediateRankingsBolt() { - super(); - } + public IntermediateRankingsBolt() { + super(); + } - public IntermediateRankingsBolt(int topN) { - super(topN); - } + public IntermediateRankingsBolt(int topN) { + super(topN); + } - public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { - super(topN, emitFrequencyInSeconds); - } + public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { + super(topN, emitFrequencyInSeconds); + } - @Override - void updateRankingsWithTuple(Tuple tuple) { - Rankable rankable = RankableObjectWithFields.from(tuple); - super.getRankings().updateWith(rankable); - } + @Override + void updateRankingsWithTuple(Tuple tuple) { + Rankable rankable = RankableObjectWithFields.from(tuple); + super.getRankings().updateWith(rankable); + } - @Override - Logger getLogger() { - return LOG; - } + @Override + Logger getLogger() { + return LOG; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java index 993a937..8364222 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/bolt/PrinterBolt.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.bolt; import org.apache.storm.topology.BasicOutputCollector; @@ -25,13 +20,13 @@ import org.apache.storm.tuple.Tuple; public class PrinterBolt extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - System.out.println(tuple); - } + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + System.out.println(tuple); + } - @Override - public void declareOutputFields(OutputFieldsDeclarer ofd) { - } + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) { + } }
