address several review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0be278a4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0be278a4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0be278a4 Branch: refs/heads/1.x-branch Commit: 0be278a4a8ae8aba5a0e86fc54e69b5c044b5377 Parents: 9a8dfb7 Author: P. Taylor Goetz <[email protected]> Authored: Fri Aug 11 15:29:53 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Aug 11 15:29:53 2017 -0400 ---------------------------------------------------------------------- .../apache/storm/starter/AnchoredWordCount.java | 138 +++++++++++++++++++ .../apache/storm/starter/ReliableWordCount.java | 121 ---------------- storm-core/pom.xml | 4 - .../storm/metrics2/StormMetricRegistry.java | 18 ++- .../org/apache/storm/task/TopologyContext.java | 10 +- 5 files changed, 154 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java new file mode 100644 index 0000000..3b22c9f --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + + +public class AnchoredWordCount { + public static class RandomSentenceSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); + + SpoutOutputCollector _collector; + Random _rand; + + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(10); + String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), + sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; + final String sentence = sentences[_rand.nextInt(sentences.length)]; + + _collector.emit(new Values(sentence), UUID.randomUUID()); + } + + protected String sentence(String input) { + return input; + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, builder.createTopology()); + + Thread.sleep(600000); + + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java deleted file mode 100644 index f05b521..0000000 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java +++ /dev/null @@ -1,121 +0,0 @@ -package org.apache.storm.starter; - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.UUID; - - -public class ReliableWordCount { - public static class RandomSentenceSpout extends BaseRichSpout { - private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); - - SpoutOutputCollector _collector; - Random _rand; - - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = new Random(); - } - - @Override - public void nextTuple() { - Utils.sleep(10); - String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), - sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; - final String sentence = sentences[_rand.nextInt(sentences.length)]; - - _collector.emit(new Values(sentence), UUID.randomUUID()); - } - - protected String sentence(String input) { - return input; - } - - @Override - public void ack(Object id) { - } - - @Override - public void fail(Object id) { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - - - public static class SplitSentence extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String sentence = tuple.getString(0); - for (String word: sentence.split("\\s+")) { - collector.emit(new Values(word, 1)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static class WordCount extends BaseBasicBolt { - Map<String, Integer> counts = new HashMap<String, Integer>(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) - count = 0; - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static void main(String[] args) throws Exception { - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new RandomSentenceSpout(), 4); - - builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.setMaxTaskParallelism(3); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("word-count", conf, builder.createTopology()); - - Thread.sleep(600000); - - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index e10222a..499a404 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -707,10 +707,6 @@ <pattern>org.eclipse.jetty</pattern> <shadedPattern>org.apache.storm.shade.org.eclipse.jetty</shadedPattern> </relocation> - <!--<relocation>--> - <!--<pattern>com.codahale.metrics</pattern>--> - <!--<shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>--> - <!--</relocation>--> <relocation> <pattern>metrics.core</pattern> <shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern> http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index ced1233..845745f 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -45,9 +45,9 @@ public class StormMetricRegistry { public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, Integer port){ SimpleGauge<T> gauge = new SimpleGauge<>(initialValue); - String metricName = String.format("storm.worker.%s.%s-%s", topologyId, port, name); - if(REGISTRY.getGauges().containsKey(metricName)){ - return (SimpleGauge)REGISTRY.getGauges().get(metricName); + String metricName = metricName(name, topologyId, null, port); + if(REGISTRY.getGauges().containsKey(metricName)){ + return (SimpleGauge)REGISTRY.getGauges().get(metricName); } else { return REGISTRY.register(metricName, gauge); } @@ -67,9 +67,7 @@ public class StormMetricRegistry { } public static Meter meter(String name, WorkerTopologyContext context, String componentId){ - // storm.worker.{topology}.{host}.{port} - String metricName = String.format("storm.worker.%s.%s.%s.%s-%s", context.getStormId(), hostName, - componentId, context.getThisWorkerPort(), name); + String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); return REGISTRY.meter(metricName); } @@ -99,7 +97,7 @@ public class StormMetricRegistry { } } - public static MetricRegistry registtry(){ + public static MetricRegistry registry(){ return REGISTRY; } @@ -130,4 +128,10 @@ public class StormMetricRegistry { sr.stop(); } } + + public static String metricName(String name, String stormId, String componentId, Integer workerPort){ + return String.format("storm.worker.%s.%s.%s.%s-%s", stormId, hostName, componentId, workerPort, name); + } + + } http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java index 080eb9a..444a8a7 100644 --- a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java +++ b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java @@ -390,23 +390,23 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } public Timer registerTimer(String name){ - return StormMetricRegistry.registtry().timer(metricName(name)); + return StormMetricRegistry.registry().timer(metricName(name)); } public Histogram registerHistogram(String name){ - return StormMetricRegistry.registtry().histogram(metricName(name)); + return StormMetricRegistry.registry().histogram(metricName(name)); } public Meter registerMeter(String name){ - return StormMetricRegistry.registtry().meter(metricName(name)); + return StormMetricRegistry.registry().meter(metricName(name)); } public Counter registerCounter(String name){ - return StormMetricRegistry.registtry().counter(metricName(name)); + return StormMetricRegistry.registry().counter(metricName(name)); } public Gauge registerGauge(String name, Gauge gauge){ - return StormMetricRegistry.registtry().register(metricName(name), gauge); + return StormMetricRegistry.registry().register(metricName(name), gauge); } private String metricName(String name){
