Repository: storm
Updated Branches:
  refs/heads/1.x-branch 25fa9dd7c -> c4404cab6


STORM-2153: New Metrics Reporting API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa1e59f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa1e59f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa1e59f4

Branch: refs/heads/1.x-branch
Commit: fa1e59f4408f4017d4b6c69e672eb7c27d68f3a7
Parents: e85b64a
Author: P. Taylor Goetz <[email protected]>
Authored: Tue Jul 11 13:58:16 2017 -0400
Committer: P. Taylor Goetz <[email protected]>
Committed: Tue Jul 11 13:58:16 2017 -0400

----------------------------------------------------------------------
 conf/defaults.yaml                              |  25 ++++
 .../storm/starter/ExclamationTopology.java      |   2 +-
 .../apache/storm/starter/ReliableWordCount.java | 121 +++++++++++++++++
 .../apache/storm/starter/WordCountTopology.java |   2 +-
 external/storm-autocreds/pom.xml                |   4 +-
 .../hdfs/avro/ConfluentAvroSerializer.java      |   2 +-
 pom.xml                                         |  10 ++
 storm-core/pom.xml                              |  17 ++-
 .../clj/org/apache/storm/daemon/executor.clj    |  18 ++-
 .../src/clj/org/apache/storm/daemon/task.clj    |   7 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  12 +-
 .../src/clj/org/apache/storm/disruptor.clj      |   5 +-
 storm-core/src/jvm/org/apache/storm/Config.java |   3 +
 .../apache/storm/metrics2/DisruptorMetrics.java |  93 +++++++++++++
 .../org/apache/storm/metrics2/SimpleGauge.java  |  38 ++++++
 .../storm/metrics2/StormMetricRegistry.java     | 133 +++++++++++++++++++
 .../reporters/ConsoleStormReporter.java         |  63 +++++++++
 .../metrics2/reporters/CsvStormReporter.java    |  93 +++++++++++++
 .../reporters/GangliaStormReporter.java         | 133 +++++++++++++++++++
 .../reporters/GraphiteStormReporter.java        | 100 ++++++++++++++
 .../metrics2/reporters/JmxStormReporter.java    |  88 ++++++++++++
 .../reporters/SheduledStormReporter.java        |  71 ++++++++++
 .../storm/metrics2/reporters/StormReporter.java |  32 +++++
 .../org/apache/storm/task/TopologyContext.java  |  26 ++++
 .../org/apache/storm/utils/DisruptorQueue.java  |  43 ++++--
 .../utils/DisruptorQueueBackpressureTest.java   |   2 +-
 .../apache/storm/utils/DisruptorQueueTest.java  |   4 +-
 27 files changed, 1111 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f89211b..b01e0b7 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -293,3 +293,28 @@ storm.daemon.metrics.reporter.plugins:
 
 # configuration of cluster metrics consumer
 storm.cluster.metrics.consumer.publish.interval.secs: 60
+
+
+storm.metrics.reporters:
+  # Graphite Reporter
+  - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
+    daemons:
+        - "supervisor"
+        - "nimbus"
+        - "worker"
+    report.period: 60
+    report.period.units: "SECONDS"
+    graphite.host: "localhost"
+    graphite.port: 2003
+
+  # Console Reporter
+  - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
+    daemons:
+        - "worker"
+    report.period: 10
+    report.period.units: "SECONDS"
+
+    #TODO: not funtional, but you get the idea
+    filters:
+        "org.apache.storm.metrics2.filters.RegexFilter":
+            expression: ".*my_component.*emitted.*"

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 26e0430..9284b52 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -79,7 +79,7 @@ public class ExclamationTopology {
 
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
+      Utils.sleep(100000);
       cluster.killTopology("test");
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
new file mode 100644
index 0000000..f05b521
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
@@ -0,0 +1,121 @@
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+
+public class ReliableWordCount {
+    public static class RandomSentenceSpout extends BaseRichSpout {
+        private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
+
+        SpoutOutputCollector _collector;
+        Random _rand;
+
+
+        @Override
+        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = new Random();
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(10);
+            String[] sentences = new String[]{sentence("the cow jumped over 
the moon"), sentence("an apple a day keeps the doctor away"),
+                    sentence("four score and seven years ago"), sentence("snow 
white and the seven dwarfs"), sentence("i am at two with nature")};
+            final String sentence = sentences[_rand.nextInt(sentences.length)];
+
+            _collector.emit(new Values(sentence), UUID.randomUUID());
+        }
+
+        protected String sentence(String input) {
+            return input;
+        }
+
+        @Override
+        public void ack(Object id) {
+        }
+
+        @Override
+        public void fail(Object id) {
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+
+
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 4);
+
+        builder.setBolt("split", new SplitSentence(), 
4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", 
new Fields("word"));
+
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(3);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, builder.createTopology());
+
+        Thread.sleep(600000);
+
+        cluster.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index e4a5711..0611894 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -98,7 +98,7 @@ public class WordCountTopology {
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("word-count", conf, builder.createTopology());
 
-      Thread.sleep(10000);
+      Thread.sleep(60000);
 
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/external/storm-autocreds/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
index ab654a3..c05b620 100644
--- a/external/storm-autocreds/pom.xml
+++ b/external/storm-autocreds/pom.xml
@@ -15,9 +15,7 @@
  See the License for the specific language governing permissions and
  limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 2008a3e..087aec5 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 import java.util.Map;
 
 /**
- * This class provides a mechanism to utilize the Confluent Schema Registry 
(https://github.com/confluentinc/schema-registry)
+ * This class provides a mechanism to utilize the Confluent Schema 
StormMetricRegistry (https://github.com/confluentinc/schema-registry)
  * for Storm to (de)serialize Avro generic records across a topology.  It 
assumes the schema registry is up and running
  * completely independent of Storm.
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f53171b..6ecb150 100644
--- a/pom.xml
+++ b/pom.xml
@@ -886,6 +886,16 @@
                 <version>${metrics.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-graphite</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-ganglia</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>metrics-clojure</groupId>
                 <artifactId>metrics-clojure</artifactId>
                 <version>${metrics-clojure.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 0497bdc..e10222a 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -281,6 +281,14 @@
             <artifactId>metrics-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-ganglia</artifactId>
+        </dependency>
+        <dependency>
             <groupId>metrics-clojure</groupId>
             <artifactId>metrics-clojure</artifactId>
         </dependency>
@@ -526,7 +534,6 @@
                             <include>org.clojure:tools.namespace</include>
                             <include>cheshire:cheshire</include>
                             <include>org.clojure:core.incubator</include>
-                            <include>io.dropwizard.metrics:*</include>
                             <include>metrics-clojure:*</include>
                         </includes>
                     </artifactSet>
@@ -700,10 +707,10 @@
                           <pattern>org.eclipse.jetty</pattern>
                           
<shadedPattern>org.apache.storm.shade.org.eclipse.jetty</shadedPattern>
                         </relocation>
-                        <relocation>
-                            <pattern>com.codahale.metrics</pattern>
-                            
<shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>
-                        </relocation>
+                        <!--<relocation>-->
+                            <!--<pattern>com.codahale.metrics</pattern>-->
+                            
<!--<shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>-->
+                        <!--</relocation>-->
                         <relocation>
                             <pattern>metrics.core</pattern>
                             
<shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern>

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8126a80..3e5dd20 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -35,6 +35,8 @@
   (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo 
IMetricsConsumer$DataPoint StateMetric])
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
+  (:import [org.apache.storm.metrics2 StormMetricRegistry])
+  (:import [com.codahale.metrics Meter])
   (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
   (:require [org.apache.storm [thrift :as thrift]
@@ -231,6 +233,8 @@
                                   (str "executor"  executor-id "-send-queue")
                                   (storm-conf 
TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                  (.getStormId worker-context)
+                                  (.getThisWorkerPort worker-context)
                                   :producer-type :single-threaded
                                   :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                   :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
@@ -275,7 +279,9 @@
                                (log-message "Got interrupted excpetion 
shutting thread down...")
                                ((:suicide-fn <>))))
      :sampler (mk-stats-sampler storm-conf)
-     :spout-throttling-metrics (if (= executor-type :spout) 
+     :failed-meter (StormMetricRegistry/meter "failed" worker-context 
component-id)
+     :acked-meter (StormMetricRegistry/meter "acked" worker-context 
component-id)
+     :spout-throttling-metrics (if (= executor-type :spout)
                                 (builtin-metrics/make-spout-throttling-data)
                                 nil)
      ;; TODO: add in the executor-specific stuff in a :specific... or make a 
spout-data, bolt-data function?
@@ -429,10 +435,12 @@
 (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta 
reason id debug?]
   (let [^ISpout spout (:object task-data)
         storm-conf (:storm-conf executor-data)
-        task-id (:task-id task-data)]
+        task-id (:task-id task-data)
+        failed-meter (:failed-meter executor-data)]
     ;;TODO: need to throttle these when there's lots of failures
     (when debug?
       (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " 
MSG-ID: " msg-id))
+    (.mark failed-meter)
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. 
msg-id task-id time-delta))
     (when time-delta
@@ -440,8 +448,10 @@
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id 
debug?]
   (let [^ISpout spout (:object task-data)
-        task-id (:task-id task-data)]
+        task-id (:task-id task-data)
+        acked-meter (:acked-meter executor-data)]
     (when debug? (log-message "SPOUT Acking message " id " " msg-id))
+    (.mark acked-meter)
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
     (when time-delta
@@ -809,6 +819,7 @@
                          (let [delta (tuple-time-delta! tuple)]
                            (when debug? 
                              (log-message "BOLT ack TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
+                           (.mark  ^Meter (:acked-meter (:executor-data 
task-data)))
                            (task/apply-hooks user-context .boltAck 
(BoltAckInfo. tuple task-id delta))
                            (when delta
                              (stats/bolt-acked-tuple! executor-stats
@@ -824,6 +835,7 @@
                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                            (when debug? 
                              (log-message "BOLT fail TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
+                           (.mark  ^Meter (:failed-meter (:executor-data 
task-data)))
                            (task/apply-hooks user-context .boltFail 
(BoltFailInfo. tuple task-id delta))
                            (when delta
                              (stats/bolt-failed-tuple! executor-stats

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj 
b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 1ae9b22..2e4df75 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -23,10 +23,12 @@
   (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo])
   (:import [org.apache.storm.task TopologyContext ShellBolt 
WorkerTopologyContext])
+  (:import [org.apache.storm.metrics2 StormMetricRegistry])
   (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
   (:import [java.util Collection List ArrayList])
+  (:import [com.codahale.metrics Meter])
   (:require [org.apache.storm
              [thrift :as thrift]
              [stats :as stats]])
@@ -128,9 +130,11 @@
         stream->component->grouper (:stream->component->grouper executor-data)
         user-context (:user-context task-data)
         executor-stats (:stats executor-data)
-        debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+        debug? (= true (storm-conf TOPOLOGY-DEBUG))
+        ^Meter emitted-meter (StormMetricRegistry/meter "emitted" 
worker-context component-id)]
         
     (fn ([^Integer out-task-id ^String stream ^List values]
+          (.mark emitted-meter)
           (when debug?
             (log-message "Emitting direct: " out-task-id "; " component-id " " 
stream " " values))
           (let [target-component (.getComponentId worker-context out-task-id)
@@ -147,6 +151,7 @@
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
+           (.mark emitted-meter)
            (when debug?
              (log-message "Emitting: " component-id " " stream " " values))
            (let [out-tasks (ArrayList.)]

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 6626272..b2810db 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -43,6 +43,7 @@
   (:import [org.apache.logging.log4j Level])
   (:import [org.apache.logging.log4j.core.config LoggerConfig])
   (:import [org.apache.storm.generated LogConfig LogLevelAction])
+  (:import [org.apache.storm.metrics2 StormMetricRegistry])
   (:gen-class))
 
 (defmulti mk-suicide-fn cluster-mode)
@@ -204,17 +205,19 @@
           (transfer-fn serializer tuple-batch)))
       transfer-fn)))
 
-(defn- mk-receive-queue-map [storm-conf executors]
+(defn- mk-receive-queue-map [storm-conf executors storm-id port]
   (->> executors
        ;; TODO: this depends on the type of executor
        (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
                                                   (storm-conf 
TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                                  storm-id port
                                                   :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
        (into {})
        ))
 
+
 (defn- stream->fields [^StormTopology topology component]
   (->> (ThriftTopologyUtils/getComponentCommon topology component)
        .get_streams
@@ -253,9 +256,10 @@
         executors (set (read-worker-executors storm-conf storm-cluster-state 
storm-id assignment-id port assignment-versions))
         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" 
(storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                                  storm-id port
                                                   :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+        executor-receive-queue-map (mk-receive-queue-map storm-conf executors 
storm-id port)
 
         receive-queue-map (->> executor-receive-queue-map
                                (mapcat (fn [[e queue]] (for [t 
(executor-id->tasks e)] [t queue])))
@@ -595,7 +599,7 @@
       (spit (worker-artifacts-pid-path conf storm-id port) pid)))
 
   (declare establish-log-setting-callback)
-
+  (StormMetricRegistry/start conf DaemonType/WORKER)
   ;; start out with empty list of timeouts 
   (def latest-log-config (atom {}))
   (def original-log-levels (atom {}))
@@ -689,6 +693,8 @@
 
                     (close-resources worker)
 
+                    (StormMetricRegistry/stop)
+
                     (log-message "Trigger any worker shutdown hooks")
                     (run-worker-shutdown-hooks worker)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj 
b/storm-core/src/clj/org/apache/storm/disruptor.clj
index 1546b3f..73a9d84 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -16,6 +16,7 @@
 
 (ns org.apache.storm.disruptor
   (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback 
DisruptorBackpressureCallback])
+  (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [com.lmax.disruptor.dsl ProducerType])
   (:require [clojure [string :as str]])
   (:require [clojure [set :as set]])
@@ -27,10 +28,10 @@
    :single-threaded ProducerType/SINGLE})
 
 (defnk disruptor-queue
-  [^String queue-name buffer-size timeout :producer-type :multi-threaded 
:batch-size 100 :batch-timeout 1]
+  [^String queue-name buffer-size timeout ^String storm-id ^Integer 
worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
   (DisruptorQueue. queue-name
                    (PRODUCER-TYPE producer-type) buffer-size
-                   timeout batch-size batch-timeout))
+                   timeout batch-size batch-timeout storm-id worker-port))
 
 (defn clojure-handler
   [afn]

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 43df951..c547530 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -139,6 +139,9 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String STORM_META_SERIALIZATION_DELEGATE = 
"storm.meta.serialization.delegate";
 
+    @isType(type=List.class)
+    public static final String STORM_METRICS_REPORTERS = 
"storm.metrics.reporters";
+
     /**
      * A list of daemon metrics  reporter plugin class names.
      * These plugins must implement {@link 
org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface.

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
new file mode 100644
index 0000000..994a965
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+    private SimpleGauge<Long> capacity;
+    private SimpleGauge<Long> population;
+    private SimpleGauge<Long> writePosition;
+    private SimpleGauge<Long> readPosition;
+    private SimpleGauge<Double> arrivalRate; // TODO: Change to meter
+    private SimpleGauge<Double> sojournTime;
+    private SimpleGauge<Long> overflow;
+    private SimpleGauge<Float> pctFull;
+
+
+    DisruptorMetrics(SimpleGauge<Long> capacity,
+                    SimpleGauge<Long> population,
+                    SimpleGauge<Long> writePosition,
+                    SimpleGauge<Long> readPosition,
+                    SimpleGauge<Double> arrivalRate,
+                    SimpleGauge<Double> sojournTime,
+                    SimpleGauge<Long> overflow,
+                    SimpleGauge<Float> pctFull) {
+        this.capacity = capacity;
+        this.population = population;
+        this.writePosition = writePosition;
+        this.readPosition = readPosition;
+        this.arrivalRate = arrivalRate;
+        this.sojournTime = sojournTime;
+        this.overflow = overflow;
+        this.pctFull = pctFull;
+    }
+
+    public void setCapacity(Long capacity) {
+        this.capacity.set(capacity);
+    }
+
+    public void setPopulation(Long population) {
+        this.population.set(population);
+    }
+
+    public void setWritePosition(Long writePosition) {
+        this.writePosition.set(writePosition);
+    }
+
+    public void setReadPosition(Long readPosition) {
+        this.readPosition.set(readPosition);
+    }
+
+    public void setArrivalRate(Double arrivalRate) {
+        this.arrivalRate.set(arrivalRate);
+    }
+
+    public void setSojournTime(Double soujournTime) {
+        this.sojournTime.set(soujournTime);
+    }
+
+    public void setOverflow(Long overflow) {
+        this.overflow.set(overflow);
+    }
+
+    public void setPercentFull(Float pctFull){
+        this.pctFull.set(pctFull);
+    }
+
+    public void set(DisruptorQueue.QueueMetrics metrics){
+        this.capacity.set(metrics.capacity());
+        this.population.set(metrics.population());
+        this.writePosition.set(metrics.writePos());
+        this.readPosition.set(metrics.readPos());
+        this.arrivalRate.set(metrics.arrivalRate());
+        this.sojournTime.set(metrics.sojournTime());
+        this.overflow.set(metrics.overflow());
+        this.pctFull.set(metrics.pctFull());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
new file mode 100644
index 0000000..5240f26
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+
+import com.codahale.metrics.Gauge;
+
+public class SimpleGauge<T> implements Gauge<T> {
+    private T value;
+
+    public SimpleGauge(T value){
+        this.value = value;
+    }
+
+    @Override
+    public T getValue() {
+        return this.value;
+    }
+
+    public void set(T value){
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
new file mode 100644
index 0000000..ced1233
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class StormMetricRegistry {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricRegistry.class);
+
+    private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+    private static final List<StormReporter> REPORTERS = new ArrayList<>();
+
+    private static String hostName = null;
+
+    public static <T> SimpleGauge<T>  gauge(T initialValue, String name, 
String topologyId, Integer port){
+        SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
+        String metricName = String.format("storm.worker.%s.%s-%s", topologyId, 
port, name);
+        if(REGISTRY.getGauges().containsKey(metricName)){
+            return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+        } else {
+            return REGISTRY.register(metricName, gauge);
+        }
+    }
+
+    public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, Integer port){
+        return new DisruptorMetrics(
+                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, 
port),
+                StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, port),
+                StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, port),
+                StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, 
port),
+                StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, port)
+        );
+    }
+
+    public static Meter meter(String name, WorkerTopologyContext context, 
String componentId){
+        // storm.worker.{topology}.{host}.{port}
+        String metricName = String.format("storm.worker.%s.%s.%s.%s-%s", 
context.getStormId(), hostName,
+                componentId, context.getThisWorkerPort(), name);
+        return REGISTRY.meter(metricName);
+    }
+
+    public static void start(Map<String, Object> stormConfig, DaemonType type){
+        String localHost = 
(String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
+        if(localHost != null){
+            hostName = localHost;
+        } else {
+            try {
+                hostName = InetAddress.getLocalHost().getCanonicalHostName();
+            } catch (UnknownHostException e) {
+                 LOG.warn("Unable to determine hostname while starting the 
metrics system. Hostname ill be reported" +
+                         " as 'localhost'.");
+            }
+        }
+
+        LOG.info("Starting metrics reporters...");
+        List<Map<String, Object>> reporterList = (List<Map<String, 
Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+        for(Map<String, Object> reporterConfig : reporterList){
+            // only start those requested
+            List<String> daemons = (List<String>)reporterConfig.get("daemons");
+            for(String daemon : daemons){
+                if(DaemonType.valueOf(daemon.toUpperCase()) == type){
+                    startReporter(stormConfig, reporterConfig);
+                }
+            }
+        }
+    }
+
+    public static MetricRegistry registtry(){
+        return REGISTRY;
+    }
+
+    private static void startReporter(Map<String, Object> stormConfig, 
Map<String, Object> reporterConfig){
+        String clazz = (String)reporterConfig.get("class");
+        StormReporter reporter = null;
+        LOG.info("Attempting to instantiate reporter class: {}", clazz);
+        try{
+            reporter = instantiate(clazz);
+        } catch(Exception e){
+            LOG.warn("Unable to instantiate metrics reporter class: {}. Will 
skip this reporter.", clazz, e);
+        }
+        if(reporter != null){
+            reporter.prepare(REGISTRY, stormConfig, reporterConfig);
+            reporter.start();
+            REPORTERS.add(reporter);
+        }
+
+    }
+
+    private static StormReporter instantiate(String klass) throws 
ClassNotFoundException, IllegalAccessException, InstantiationException {
+        Class<?> c = Class.forName(klass);
+        return  (StormReporter) c.newInstance();
+    }
+
+    public static void stop(){
+        for(StormReporter sr : REPORTERS){
+            sr.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
new file mode 100644
index 0000000..5322bf8
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsoleStormReporter extends 
SheduledStormReporter<ConsoleReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(ConsoleStormReporter.class);
+
+    @Override
+    public void prepare(MetricRegistry registry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing ConsoleReporter");
+        ConsoleReporter.Builder builder = 
ConsoleReporter.forRegistry(registry);
+
+        builder.outputTo(System.out);
+        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        reporter = builder.build();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
new file mode 100644
index 0000000..4225b7c
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvStormReporter extends SheduledStormReporter<CsvReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(CsvStormReporter.class);
+
+    public static final String CSV_LOG_DIR = "csv.log.dir";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = MetricsUtils.getMetricsReporterLocale(reporterConf);
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        //TODO: expose some simple MetricFilters 
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        File csvMetricsDir = getCsvLogDir(stormConf, reporterConf);
+        reporter = builder.build(csvMetricsDir);
+    }
+
+
+    private static File getCsvLogDir(Map stormConf, Map reporterConf) {
+        String csvMetricsLogDirectory = 
Utils.getString(reporterConf.get(CSV_LOG_DIR), null);
+        if (csvMetricsLogDirectory == null) {
+            csvMetricsLogDirectory = 
ConfigUtils.absoluteStormLocalDir(stormConf);
+            csvMetricsLogDirectory = csvMetricsLogDirectory + 
ConfigUtils.FILE_SEPARATOR + "csvmetrics";
+        }
+        File csvMetricsDir = new File(csvMetricsLogDirectory);
+        validateCreateOutputDir(csvMetricsDir);
+        return csvMetricsDir;
+    }
+
+    private static void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have 
write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a 
directory.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
new file mode 100644
index 0000000..d8d0269
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends 
SheduledStormReporter<GangliaReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(GangliaStormReporter.class);
+
+    public static final String GANGLIA_HOST = "ganglia.host";
+    public static final String GANGLIA_PORT = "ganglia.port";
+    public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with";
+    public static final String GANGLIA_DMAX = "ganglia.dmax";
+    public static final String GANGLIA_TMAX = "ganglia.tmax";
+    public static final String GANGLIA_UDP_ADDRESSING_MODE = 
"ganglia.udp.addressing.mode";
+    public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+    public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit";
+    public static final String GANGLIA_TTL = "ganglia.ttl";
+    public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing...");
+        GangliaReporter.Builder builder = 
GangliaReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        //TODO: expose some simple MetricFilters 
+        String prefix = getMetricsPrefixedWith(reporterConf);
+        if (prefix != null) {
+            builder.prefixedWith(prefix);
+        }
+
+        Integer dmax = getGangliaDMax(reporterConf);
+        if (prefix != null) {
+            builder.withDMax(dmax);
+        }
+
+        Integer tmax = getGangliaTMax(reporterConf);
+        if (prefix != null) {
+            builder.withTMax(tmax);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        // Not exposed:
+        // * withClock(Clock)
+
+        String group = getMetricsTargetUDPGroup(reporterConf);
+        Integer port = getMetricsTargetPort(reporterConf);
+        String udpAddressingMode = 
getMetricsTargetUDPAddressingMode(reporterConf);
+        Integer ttl = getMetricsTargetTtl(reporterConf);
+
+        GMetric.UDPAddressingMode mode = 
udpAddressingMode.equalsIgnoreCase("multicast") ?
+                GMetric.UDPAddressingMode.MULTICAST : 
GMetric.UDPAddressingMode.UNICAST;
+
+        try {
+            GMetric sender = new GMetric(group, port, mode, ttl);
+            reporter = builder.build(sender);
+        }catch (IOException ioe){
+            //TODO
+            LOG.error("Exception in GangliaReporter config", ioe);
+        }
+    }
+
+
+    public static String getMetricsTargetUDPGroup(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GANGLIA_UDP_GROUP), null);
+    }
+
+    public static String getMetricsTargetUDPAddressingMode(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), 
null);
+    }
+
+    public static Integer getMetricsTargetTtl(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_TTL), null);
+    }
+
+    public static Integer getGangliaDMax(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_DMAX), null);
+    }
+
+    public static Integer getGangliaTMax(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_TMAX), null);
+    }
+
+
+    private static Integer getMetricsTargetPort(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_PORT), null);
+    }
+
+    private static String getMetricsPrefixedWith(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
new file mode 100644
index 0000000..7a2b31b
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import com.codahale.metrics.graphite.GraphiteUDP;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GraphiteStormReporter extends 
SheduledStormReporter<GraphiteReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(GraphiteStormReporter.class);
+
+    public static final String GRAPHITE_PREFIXED_WITH = 
"graphite.prefixed.with";
+    public static final String GRAPHITE_HOST = "graphite.host";
+    public static final String GRAPHITE_PORT = "graphite.port";
+    public static final String GRAPHITE_TRANSPORT = "graphite.transport";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing...");
+        GraphiteReporter.Builder builder = 
GraphiteReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        //TODO: expose some simple MetricFilters 
+        String prefix = getMetricsPrefixedWith(reporterConf);
+        if (prefix != null) {
+            builder.prefixedWith(prefix);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        // Not exposed:
+        // * withClock(Clock)
+
+        String host = getMetricsTargetHost(reporterConf);
+        Integer port = getMetricsTargetPort(reporterConf);
+        String transport = getMetricsTargetTransport(reporterConf);
+        GraphiteSender sender = null;
+        //TODO: error checking
+        if (transport.equalsIgnoreCase("udp")) {
+            sender = new GraphiteUDP(host, port);
+        } else {
+            //TODO: pickled support
+            sender = new Graphite(host, port);
+        }
+        reporter = builder.build(sender);
+    }
+
+    private static String getMetricsPrefixedWith(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
+    }
+
+    private static String getMetricsTargetHost(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
+    }
+
+    private static Integer getMetricsTargetPort(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
+    }
+
+    private static String getMetricsTargetTransport(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
new file mode 100644
index 0000000..7ac6cde
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxStormReporter implements StormReporter<JmxReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(JmxStormReporter.class);
+    public static final String JMX_DOMAIN = "jmx.domain";
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map<String, Object> 
stormConf, Map<String, Object> reporterConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = 
MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        String domain = getMetricsJMXDomain(reporterConf);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+
+        // TODO: expose some simple MetricFilters
+        // other builder functions not exposed:
+        //  * createsObjectNamesWith(ObjectNameFactory onFactory) 
+        //  * registerWith (MBeanServer)
+        //  * specificDurationUnits (Map<String,TimeUnit> 
specificDurationUnits)
+        //  * specificRateUnits(Map<String,TimeUnit> specificRateUnits)
+
+        reporter = builder.build();
+    }
+
+    public static String getMetricsJMXDomain(Map reporterConf) {
+        return Utils.getString(reporterConf, JMX_DOMAIN);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
new file mode 100644
index 0000000..1b1e7a0
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SheduledStormReporter<T extends ScheduledReporter> 
implements StormReporter{
+    private static final Logger LOG = 
LoggerFactory.getLogger(SheduledStormReporter.class);
+    protected ScheduledReporter reporter;
+    long reportingPeriod;
+    TimeUnit reportingPeriodUnit;
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start(reportingPeriod, reportingPeriodUnit);
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+
+
+    static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
+        TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+        return unit == null ? TimeUnit.SECONDS : unit;
+    }
+
+    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+        String rateUnitString = Utils.getString(reporterConf.get(configName), 
null);
+        if (rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+
+    static long getReportPeriod(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
new file mode 100644
index 0000000..c36e44e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter<T extends Reporter> {
+    String REPORT_PERIOD = "report.period";
+    String REPORT_PERIOD_UNITS = "report.period.units";
+
+    void prepare(MetricRegistry metricsRegistry, Map<String, Object> conf, 
Map<String, Object> reporterConf);
+    void start();
+    void stop();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java 
b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
index 91cbee9..080eb9a 100644
--- a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.task;
 
+import com.codahale.metrics.*;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.StormTopology;
@@ -26,6 +27,7 @@ import org.apache.storm.metric.api.IReducer;
 import org.apache.storm.metric.api.ICombiner;
 import org.apache.storm.metric.api.ReducedMetric;
 import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.state.ISubscribedState;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
@@ -386,4 +388,28 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
     public CombinedMetric registerMetric(String name, ICombiner combiner, int 
timeBucketSizeInSecs) {
         return registerMetric(name, new CombinedMetric(combiner), 
timeBucketSizeInSecs);
     }
+
+    public Timer registerTimer(String name){
+        return StormMetricRegistry.registtry().timer(metricName(name));
+    }
+
+    public Histogram registerHistogram(String name){
+        return StormMetricRegistry.registtry().histogram(metricName(name));
+    }
+
+    public Meter registerMeter(String name){
+        return StormMetricRegistry.registtry().meter(metricName(name));
+    }
+
+    public Counter registerCounter(String name){
+        return StormMetricRegistry.registtry().counter(metricName(name));
+    }
+
+    public Gauge registerGauge(String name, Gauge gauge){
+        return StormMetricRegistry.registtry().register(metricName(name), 
gauge);
+    }
+
+    private String metricName(String name){
+        return String.format("storm.topology.%s.%s.%s-%s", getStormId(), 
getThisComponentId(), getThisWorkerPort(), name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fe90240..35bc83f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -34,6 +34,9 @@ import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.DisruptorMetrics;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +67,7 @@ public class DisruptorQueue implements IStatefulObject {
     private static final String PREFIX = "disruptor-";
     private static final FlusherPool FLUSHER = new FlusherPool();
     
+    private static final Timer METRICS_TIMER = new 
Timer("disruptor-metrics-timer", true);
     private static int getNumFlusherPoolThreads() {
         int numThreads = 100;
         try {
@@ -345,27 +349,31 @@ public class DisruptorQueue implements IStatefulObject {
             return (1.0F * population() / capacity());
         }
 
-        public Object getState() {
-            Map state = new HashMap<String, Object>();
+        public double arrivalRate(){
+            return _rateTracker.reportRate();
+        }
 
+        public double sojournTime(){
             // get readPos then writePos so it's never an under-estimate
             long rp = readPos();
             long wp = writePos();
-
-            final double arrivalRateInSecs = _rateTracker.reportRate();
+            final double arrivalRateInSecs = arrivalRate();
 
             //Assume the queue is stable, in which the arrival rate is equal 
to the consumption rate.
             // If this assumption does not hold, the calculation of sojourn 
time should also consider
             // departure rate according to Queuing Theory.
-            final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 
0.00001) * 1000.0;
+            return (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+        }
 
+        public Object getState() {
+            Map state = new HashMap<String, Object>();
             state.put("capacity", capacity());
-            state.put("population", wp - rp);
-            state.put("write_pos", wp);
-            state.put("read_pos", rp);
-            state.put("arrival_rate_secs", arrivalRateInSecs);
-            state.put("sojourn_time_ms", sojournTime); //element sojourn time 
in milliseconds
-            state.put("overflow", _overflowCount.get());
+            state.put("population", population());
+            state.put("write_pos", writePos());
+            state.put("read_pos", readPos());
+            state.put("arrival_rate_secs", arrivalRate());
+            state.put("sojourn_time_ms", sojournTime()); //element sojourn 
time in milliseconds
+            state.put("overflow", overflow());
 
             return state;
         }
@@ -385,7 +393,8 @@ public class DisruptorQueue implements IStatefulObject {
     private final int _inputBatchSize;
     private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new 
ConcurrentHashMap<Long, ThreadLocalInserter>();
     private final Flusher _flusher;
-    private final QueueMetrics _metrics;
+    private final QueueMetrics _metrics; // old metrics API
+    private final DisruptorMetrics _disruptorMetrics;
 
     private String _queueName = "";
     private DisruptorBackpressureCallback _cb = null;
@@ -395,7 +404,7 @@ public class DisruptorQueue implements IStatefulObject {
     private final AtomicLong _overflowCount = new AtomicLong(0);
     private volatile boolean _throttleOn = false;
 
-    public DisruptorQueue(String queueName, ProducerType type, int size, long 
readTimeout, int inputBatchSize, long flushInterval) {
+    public DisruptorQueue(String queueName, ProducerType type, int size, long 
readTimeout, int inputBatchSize, long flushInterval, String topologyId, int 
port) {
         this._queueName = PREFIX + queueName;
         WaitStrategy wait;
         if (readTimeout <= 0) {
@@ -409,12 +418,20 @@ public class DisruptorQueue implements IStatefulObject {
         _barrier = _buffer.newBarrier();
         _buffer.addGatingSequences(_consumer);
         _metrics = new QueueMetrics();
+        _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, 
topologyId, port);
         //The batch size can be no larger than half the full queue size.
         //This is mostly to avoid contention issues.
         _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
 
         _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
         _flusher.start();
+
+        METRICS_TIMER.schedule(new TimerTask(){
+            @Override
+            public void run() {
+                _disruptorMetrics.set(_metrics);
+            }
+        }, 15000, 15000); // TODO: Configurable interval
     }
 
     public String getName() {

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
 
b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index 7072e55..110fe88 100644
--- 
a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ 
b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends 
TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L, "test", 1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java 
b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index e7ac54e..c834cbb 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L, "test", 1000);
     }
 
     private static DisruptorQueue createQueue(String name, int batchSize, int 
queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 
batchSize, 1L);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 
batchSize, 1L, "test", 1000);
     }
 }

Reply via email to