Repository: storm
Updated Branches:
  refs/heads/metrics_v2 9a8dfb7d0 -> 6eaa1a85a


address several review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0be278a4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0be278a4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0be278a4

Branch: refs/heads/metrics_v2
Commit: 0be278a4a8ae8aba5a0e86fc54e69b5c044b5377
Parents: 9a8dfb7
Author: P. Taylor Goetz <ptgo...@gmail.com>
Authored: Fri Aug 11 15:29:53 2017 -0400
Committer: P. Taylor Goetz <ptgo...@gmail.com>
Committed: Fri Aug 11 15:29:53 2017 -0400

----------------------------------------------------------------------
 .../apache/storm/starter/AnchoredWordCount.java | 138 +++++++++++++++++++
 .../apache/storm/starter/ReliableWordCount.java | 121 ----------------
 storm-core/pom.xml                              |   4 -
 .../storm/metrics2/StormMetricRegistry.java     |  18 ++-
 .../org/apache/storm/task/TopologyContext.java  |  10 +-
 5 files changed, 154 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
new file mode 100644
index 0000000..3b22c9f
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+
+public class AnchoredWordCount {
+    public static class RandomSentenceSpout extends BaseRichSpout {
+        private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
+
+        SpoutOutputCollector _collector;
+        Random _rand;
+
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = new Random();
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(10);
+            String[] sentences = new String[]{sentence("the cow jumped over 
the moon"), sentence("an apple a day keeps the doctor away"),
+                    sentence("four score and seven years ago"), sentence("snow 
white and the seven dwarfs"), sentence("i am at two with nature")};
+            final String sentence = sentences[_rand.nextInt(sentences.length)];
+
+            _collector.emit(new Values(sentence), UUID.randomUUID());
+        }
+
+        protected String sentence(String input) {
+            return input;
+        }
+
+        @Override
+        public void ack(Object id) {
+        }
+
+        @Override
+        public void fail(Object id) {
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+
+
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 4);
+
+        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
+
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(3);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        Thread.sleep(600000);
+
+        cluster.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
deleted file mode 100644
index f05b521..0000000
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package org.apache.storm.starter;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-
-
-public class ReliableWordCount {
-    public static class RandomSentenceSpout extends BaseRichSpout {
-        private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
-
-        SpoutOutputCollector _collector;
-        Random _rand;
-
-
-        @Override
-        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
-            _collector = collector;
-            _rand = new Random();
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(10);
-            String[] sentences = new String[]{sentence("the cow jumped over 
the moon"), sentence("an apple a day keeps the doctor away"),
-                    sentence("four score and seven years ago"), sentence("snow 
white and the seven dwarfs"), sentence("i am at two with nature")};
-            final String sentence = sentences[_rand.nextInt(sentences.length)];
-
-            _collector.emit(new Values(sentence), UUID.randomUUID());
-        }
-
-        protected String sentence(String input) {
-            return input;
-        }
-
-        @Override
-        public void ack(Object id) {
-        }
-
-        @Override
-        public void fail(Object id) {
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-    }
-
-
-    public static class SplitSentence extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word: sentence.split("\\s+")) {
-                collector.emit(new Values(word, 1));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if (count == null)
-                count = 0;
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("spout", new RandomSentenceSpout(), 4);
-
-        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
-
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(3);
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("word-count", conf, builder.createTopology());
-
-        Thread.sleep(600000);
-
-        cluster.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index e10222a..499a404 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -707,10 +707,6 @@
                           <pattern>org.eclipse.jetty</pattern>
                           
<shadedPattern>org.apache.storm.shade.org.eclipse.jetty</shadedPattern>
                         </relocation>
-                        <!--<relocation>-->
-                            <!--<pattern>com.codahale.metrics</pattern>-->
-                            
<!--<shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>-->
-                        <!--</relocation>-->
                         <relocation>
                             <pattern>metrics.core</pattern>
                             
<shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern>

http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index ced1233..845745f 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -45,9 +45,9 @@ public class StormMetricRegistry {
 
     public static <T> SimpleGauge<T>  gauge(T initialValue, String name, 
String topologyId, Integer port){
         SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
-        String metricName = String.format("storm.worker.%s.%s-%s", topologyId, 
port, name);
-        if(REGISTRY.getGauges().containsKey(metricName)){
-            return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+        String metricName = metricName(name, topologyId, null, port);
+            if(REGISTRY.getGauges().containsKey(metricName)){
+                return (SimpleGauge)REGISTRY.getGauges().get(metricName);
         } else {
             return REGISTRY.register(metricName, gauge);
         }
@@ -67,9 +67,7 @@ public class StormMetricRegistry {
     }
 
     public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
-        // storm.worker.{topology}.{host}.{port}
-        String metricName = String.format("storm.worker.%s.%s.%s.%s-%s", 
context.getStormId(), hostName,
-                componentId, context.getThisWorkerPort(), name);
+        String metricName = metricName(name, context.getStormId(), 
componentId, context.getThisWorkerPort());
         return REGISTRY.meter(metricName);
     }
 
@@ -99,7 +97,7 @@ public class StormMetricRegistry {
         }
     }
 
-    public static MetricRegistry registtry(){
+    public static MetricRegistry registry(){
         return REGISTRY;
     }
 
@@ -130,4 +128,10 @@ public class StormMetricRegistry {
             sr.stop();
         }
     }
+
+    public static String metricName(String name, String stormId, String 
componentId, Integer workerPort){
+        return String.format("storm.worker.%s.%s.%s.%s-%s", stormId, hostName, 
componentId, workerPort, name);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0be278a4/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java 
b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
index 080eb9a..444a8a7 100644
--- a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -390,23 +390,23 @@ public class TopologyContext extends 
WorkerTopologyContext implements IMetricsCo
     }
 
     public Timer registerTimer(String name){
-        return StormMetricRegistry.registtry().timer(metricName(name));
+        return StormMetricRegistry.registry().timer(metricName(name));
     }
 
     public Histogram registerHistogram(String name){
-        return StormMetricRegistry.registtry().histogram(metricName(name));
+        return StormMetricRegistry.registry().histogram(metricName(name));
     }
 
     public Meter registerMeter(String name){
-        return StormMetricRegistry.registtry().meter(metricName(name));
+        return StormMetricRegistry.registry().meter(metricName(name));
     }
 
     public Counter registerCounter(String name){
-        return StormMetricRegistry.registtry().counter(metricName(name));
+        return StormMetricRegistry.registry().counter(metricName(name));
     }
 
     public Gauge registerGauge(String name, Gauge gauge){
-        return StormMetricRegistry.registtry().register(metricName(name), 
gauge);
+        return StormMetricRegistry.registry().register(metricName(name), 
gauge);
     }
 
     private String metricName(String name){

Reply via email to