http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
new file mode 100644
index 0000000..bef9585
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsoleStormReporter extends ScheduledStormReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConsoleStormReporter.class);
+
+    @Override
+    public void prepare(MetricRegistry registry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing ConsoleReporter");
+        ConsoleReporter.Builder builder = 
ConsoleReporter.forRegistry(registry);
+
+        builder.outputTo(System.out);
+        Locale locale = ClientMetricsUtils.getMetricsReporterLocale(stormConf);
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+
+        TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(stormConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(stormConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if (filter != null) {
+            builder.filter(filter);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        reporter = builder.build();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
new file mode 100644
index 0000000..c52cd2c
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricRegistry;
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CsvStormReporter extends ScheduledStormReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CsvStormReporter.class);
+
+    public static final String CSV_LOG_DIR = "csv.log.dir";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = 
ClientMetricsUtils.getMetricsReporterLocale(reporterConf);
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+
+        TimeUnit rateUnit = 
ClientMetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if (filter != null) {
+            builder.filter(filter);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        File csvMetricsDir = getCsvLogDir(stormConf, reporterConf);
+        reporter = builder.build(csvMetricsDir);
+    }
+
+
+    private static File getCsvLogDir(Map stormConf, Map reporterConf) {
+        String csvMetricsLogDirectory = 
ObjectReader.getString(reporterConf.get(CSV_LOG_DIR), null);
+        if (csvMetricsLogDirectory == null) {
+            csvMetricsLogDirectory = 
ConfigUtils.absoluteStormLocalDir(stormConf);
+            csvMetricsLogDirectory = csvMetricsLogDirectory + 
ConfigUtils.FILE_SEPARATOR + "csvmetrics";
+        }
+        File csvMetricsDir = new File(csvMetricsLogDirectory);
+        validateCreateOutputDir(csvMetricsDir);
+        return csvMetricsDir;
+    }
+
+    private static void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have 
write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a 
directory.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
new file mode 100644
index 0000000..c9f3253
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ganglia.GangliaReporter;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GangliaStormReporter extends ScheduledStormReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GangliaStormReporter.class);
+
+    public static final String GANGLIA_HOST = "ganglia.host";
+    public static final String GANGLIA_PORT = "ganglia.port";
+    public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with";
+    public static final String GANGLIA_DMAX = "ganglia.dmax";
+    public static final String GANGLIA_TMAX = "ganglia.tmax";
+    public static final String GANGLIA_UDP_ADDRESSING_MODE = 
"ganglia.udp.addressing.mode";
+    public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+    public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit";
+    public static final String GANGLIA_TTL = "ganglia.ttl";
+    public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing...");
+        GangliaReporter.Builder builder = 
GangliaReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = 
ClientMetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        String prefix = getMetricsPrefixedWith(reporterConf);
+        if (prefix != null) {
+            builder.prefixedWith(prefix);
+        }
+
+        Integer dmax = getGangliaDMax(reporterConf);
+        if (prefix != null) {
+            builder.withDMax(dmax);
+        }
+
+        Integer tmax = getGangliaTMax(reporterConf);
+        if (prefix != null) {
+            builder.withTMax(tmax);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        String group = getMetricsTargetUdpGroup(reporterConf);
+        Integer port = getMetricsTargetPort(reporterConf);
+        String udpAddressingMode = 
getMetricsTargetUdpAddressingMode(reporterConf);
+        Integer ttl = getMetricsTargetTtl(reporterConf);
+
+        GMetric.UDPAddressingMode mode = 
udpAddressingMode.equalsIgnoreCase("multicast")
+                ? GMetric.UDPAddressingMode.MULTICAST : 
GMetric.UDPAddressingMode.UNICAST;
+
+        try {
+            GMetric sender = new GMetric(group, port, mode, ttl);
+            reporter = builder.build(sender);
+        } catch (IOException ioe) {
+            LOG.error("Exception in GangliaReporter config", ioe);
+        }
+    }
+
+
+    public static String getMetricsTargetUdpGroup(Map reporterConf) {
+        return ObjectReader.getString(reporterConf.get(GANGLIA_UDP_GROUP), 
null);
+    }
+
+    public static String getMetricsTargetUdpAddressingMode(Map reporterConf) {
+        return 
ObjectReader.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), null);
+    }
+
+    public static Integer getMetricsTargetTtl(Map reporterConf) {
+        return ObjectReader.getInt(reporterConf.get(GANGLIA_TTL), null);
+    }
+
+    public static Integer getGangliaDMax(Map reporterConf) {
+        return ObjectReader.getInt(reporterConf.get(GANGLIA_DMAX), null);
+    }
+
+    public static Integer getGangliaTMax(Map reporterConf) {
+        return ObjectReader.getInt(reporterConf.get(GANGLIA_TMAX), null);
+    }
+
+    private static Integer getMetricsTargetPort(Map reporterConf) {
+        return ObjectReader.getInt(reporterConf.get(GANGLIA_PORT), null);
+    }
+
+    private static String getMetricsPrefixedWith(Map reporterConf) {
+        return ObjectReader.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), 
null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
new file mode 100644
index 0000000..57eb5a4
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import com.codahale.metrics.graphite.GraphiteUDP;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GraphiteStormReporter extends ScheduledStormReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GraphiteStormReporter.class);
+
+    public static final String GRAPHITE_PREFIXED_WITH = 
"graphite.prefixed.with";
+    public static final String GRAPHITE_HOST = "graphite.host";
+    public static final String GRAPHITE_PORT = "graphite.port";
+    public static final String GRAPHITE_TRANSPORT = "graphite.transport";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map 
reporterConf) {
+        LOG.debug("Preparing...");
+        GraphiteReporter.Builder builder = 
GraphiteReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = 
ClientMetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        String prefix = getMetricsPrefixedWith(reporterConf);
+        if (prefix != null) {
+            builder.prefixedWith(prefix);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        // Not exposed:
+        // * withClock(Clock)
+
+        String host = getMetricsTargetHost(reporterConf);
+        Integer port = getMetricsTargetPort(reporterConf);
+        String transport = getMetricsTargetTransport(reporterConf);
+        GraphiteSender sender = null;
+        if (transport.equalsIgnoreCase("udp")) {
+            sender = new GraphiteUDP(host, port);
+        } else {
+            sender = new Graphite(host, port);
+        }
+        reporter = builder.build(sender);
+    }
+
+    private static String getMetricsPrefixedWith(Map reporterConf) {
+        return 
ObjectReader.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
+    }
+
+    private static String getMetricsTargetHost(Map reporterConf) {
+        return ObjectReader.getString(reporterConf.get(GRAPHITE_HOST), null);
+    }
+
+    private static Integer getMetricsTargetPort(Map reporterConf) {
+        return ObjectReader.getInt(reporterConf.get(GRAPHITE_PORT), null);
+    }
+
+    private static String getMetricsTargetTransport(Map reporterConf) {
+        return ObjectReader.getString(reporterConf.get(GRAPHITE_TRANSPORT), 
"tcp");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
new file mode 100644
index 0000000..f995f90
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmxStormReporter implements StormReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JmxStormReporter.class);
+    public static final String JMX_DOMAIN = "jmx.domain";
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map<String, Object> 
stormConf, Map<String, Object> reporterConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = 
ClientMetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        String domain = getMetricsJmxDomain(reporterConf);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+
+        StormMetricsFilter filter = 
ScheduledStormReporter.getMetricsFilter(reporterConf);
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        // other builder functions not exposed:
+        //  * createsObjectNamesWith(ObjectNameFactory onFactory) 
+        //  * registerWith (MBeanServer)
+        //  * specificDurationUnits (Map<String,TimeUnit> 
specificDurationUnits)
+        //  * specificRateUnits(Map<String,TimeUnit> specificRateUnits)
+
+        reporter = builder.build();
+    }
+
+    public static String getMetricsJmxDomain(Map reporterConf) {
+        return ObjectReader.getString(reporterConf, JMX_DOMAIN);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
new file mode 100644
index 0000000..61af5be
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ScheduledStormReporter implements StormReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledStormReporter.class);
+    protected ScheduledReporter reporter;
+    protected long reportingPeriod;
+    protected TimeUnit reportingPeriodUnit;
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start(reportingPeriod, reportingPeriodUnit);
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+
+
+    public static TimeUnit getReportPeriodUnit(Map<String, Object> 
reporterConf) {
+        TimeUnit unit = getTimeUnitForConfig(reporterConf, 
REPORT_PERIOD_UNITS);
+        return unit == null ? TimeUnit.SECONDS : unit;
+    }
+
+    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String 
configName) {
+        String rateUnitString = 
ObjectReader.getString(reporterConf.get(configName), null);
+        if (rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+
+    public static long getReportPeriod(Map reporterConf) {
+        return ObjectReader.getInt(reporterConf.get(REPORT_PERIOD), 
10).longValue();
+    }
+
+    public static StormMetricsFilter getMetricsFilter(Map reporterConf) {
+        StormMetricsFilter filter = null;
+        Map<String, Object> filterConf = (Map)reporterConf.get("filter");
+        if (filterConf != null) {
+            String clazz = (String) filterConf.get("class");
+            if (clazz != null) {
+                filter = ReflectionUtils.newInstance(clazz);
+                filter.prepare(filterConf);
+            }
+        }
+        return filter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
new file mode 100644
index 0000000..907965a
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter extends Reporter {
+    String REPORT_PERIOD = "report.period";
+    String REPORT_PERIOD_UNITS = "report.period.units";
+
+    void prepare(MetricRegistry metricsRegistry, Map<String, Object> conf, 
Map<String, Object> reporterConf);
+
+    void start();
+
+    void stop();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java 
b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 78246bb..fbf1e92 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -15,44 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.stats;
 
+import com.codahale.metrics.Counter;
 import com.google.common.collect.Lists;
-
+import java.util.List;
 import org.apache.storm.generated.BoltStats;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
-import java.util.List;
-
 @SuppressWarnings("unchecked")
 public class BoltExecutorStats extends CommonStats {
-
-    MultiCountStatAndMetric   ackedStats;
-    MultiCountStatAndMetric   failedStats;
     MultiCountStatAndMetric   executedStats;
     MultiLatencyStatAndMetric processLatencyStats;
     MultiLatencyStatAndMetric executeLatencyStats;
 
     public BoltExecutorStats(int rate,int numStatBuckets) {
         super(rate,numStatBuckets);
-        this.ackedStats = new MultiCountStatAndMetric(numStatBuckets);
-        this.failedStats = new MultiCountStatAndMetric(numStatBuckets);
         this.executedStats = new MultiCountStatAndMetric(numStatBuckets);
         this.processLatencyStats = new 
MultiLatencyStatAndMetric(numStatBuckets);
         this.executeLatencyStats = new 
MultiLatencyStatAndMetric(numStatBuckets);
     }
 
-    public MultiCountStatAndMetric getAcked() {
-        return ackedStats;
-    }
-
-    public MultiCountStatAndMetric getFailed() {
-        return failedStats;
-    }
-
     public MultiCountStatAndMetric getExecuted() {
         return executedStats;
     }
@@ -67,8 +54,6 @@ public class BoltExecutorStats extends CommonStats {
 
     @Override
     public void cleanupStats() {
-        ackedStats.close();
-        failedStats.close();
         executedStats.close();
         processLatencyStats.close();
         executeLatencyStats.close();
@@ -81,16 +66,17 @@ public class BoltExecutorStats extends CommonStats {
         this.getExecuteLatencies().record(key, latencyMs);
     }
 
-    public void boltAckedTuple(String component, String stream, long 
latencyMs) {
+    public void boltAckedTuple(String component, String stream, long 
latencyMs, Counter ackedCounter) {
         List key = Lists.newArrayList(component, stream);
         this.getAcked().incBy(key, this.rate);
+        ackedCounter.inc(this.rate);
         this.getProcessLatencies().record(key, latencyMs);
     }
 
-    public void boltFailedTuple(String component, String stream, long 
latencyMs) {
+    public void boltFailedTuple(String component, String stream, long 
latencyMs, Counter failedCounter) {
         List key = Lists.newArrayList(component, stream);
         this.getFailed().incBy(key, this.rate);
-
+        failedCounter.inc(this.rate);
     }
 
     @Override
@@ -103,8 +89,8 @@ public class BoltExecutorStats extends CommonStats {
 
         // bolt stats
         BoltStats boltStats = new BoltStats(
-                StatsUtil.windowSetConverter(valueStat(ackedStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-                StatsUtil.windowSetConverter(valueStat(failedStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(getAcked()), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(getFailed()), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
                 StatsUtil.windowSetConverter(valueStat(processLatencyStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
                 StatsUtil.windowSetConverter(valueStat(executedStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
                 StatsUtil.windowSetConverter(valueStat(executeLatencyStats), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY));

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java 
b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
index b6461c5..4c95da0 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -15,8 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.stats;
 
+import com.codahale.metrics.Counter;
 import java.util.Map;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
@@ -24,9 +26,10 @@ import 
org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
 @SuppressWarnings("unchecked")
 public abstract class CommonStats {
-
     private final MultiCountStatAndMetric emittedStats;
     private final MultiCountStatAndMetric transferredStats;
+    private final MultiCountStatAndMetric ackedStats;
+    private final MultiCountStatAndMetric failedStats;
 
     protected final int rate;
 
@@ -34,6 +37,16 @@ public abstract class CommonStats {
         this.rate = rate;
         this.emittedStats = new MultiCountStatAndMetric(numStatBuckets);
         this.transferredStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.ackedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.failedStats = new MultiCountStatAndMetric(numStatBuckets);
+    }
+
+    public MultiCountStatAndMetric getFailed() {
+        return failedStats;
+    }
+
+    public MultiCountStatAndMetric getAcked() {
+        return ackedStats;
     }
 
     public int getRate() {
@@ -48,17 +61,21 @@ public abstract class CommonStats {
         return transferredStats;
     }
 
-    public void emittedTuple(String stream) {
+    public void emittedTuple(String stream, Counter emittedCounter) {
         this.getEmitted().incBy(stream, this.rate);
+        emittedCounter.inc(this.rate);
     }
 
-    public void transferredTuples(String stream, int amount) {
+    public void transferredTuples(String stream, int amount, Counter 
transferredCounter) {
         this.getTransferred().incBy(stream, this.rate * amount);
+        transferredCounter.inc(amount);
     }
 
     public void cleanupStats() {
         emittedStats.close();
         transferredStats.close();
+        ackedStats.close();
+        failedStats.close();
     }
 
     protected Map<String,Map<String,Long>> valueStat(MultiCountStatAndMetric 
metric) {

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java 
b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 6c3d589..2fc1502 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -15,55 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.stats;
 
+import com.codahale.metrics.Counter;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
 @SuppressWarnings("unchecked")
 public class SpoutExecutorStats extends CommonStats {
-
-    private final MultiCountStatAndMetric ackedStats;
-    private final MultiCountStatAndMetric failedStats;
     private final MultiLatencyStatAndMetric completeLatencyStats;
 
     public SpoutExecutorStats(int rate,int numStatBuckets) {
         super(rate,numStatBuckets);
-        this.ackedStats = new MultiCountStatAndMetric(numStatBuckets);
-        this.failedStats = new MultiCountStatAndMetric(numStatBuckets);
         this.completeLatencyStats = new 
MultiLatencyStatAndMetric(numStatBuckets);
     }
 
-    public MultiCountStatAndMetric getAcked() {
-        return ackedStats;
-    }
-
-    public MultiCountStatAndMetric getFailed() {
-        return failedStats;
-    }
-
     public MultiLatencyStatAndMetric getCompleteLatencies() {
         return completeLatencyStats;
     }
 
     @Override
     public void cleanupStats() {
-        ackedStats.close();
-        failedStats.close();
         completeLatencyStats.close();
         super.cleanupStats();
     }
 
-    public void spoutAckedTuple(String stream, long latencyMs) {
+    public void spoutAckedTuple(String stream, long latencyMs, Counter 
ackedCounter) {
         this.getAcked().incBy(stream, this.rate);
+        ackedCounter.inc(this.rate);
         this.getCompleteLatencies().record(stream, latencyMs);
     }
 
-    public void spoutFailedTuple(String stream, long latencyMs) {
+    public void spoutFailedTuple(String stream, long latencyMs, Counter 
failedCounter) {
         this.getFailed().incBy(stream, this.rate);
+        failedCounter.inc(this.rate);
     }
 
     @Override
@@ -76,7 +64,7 @@ public class SpoutExecutorStats extends CommonStats {
 
         // spout stats
         SpoutStats spoutStats = new SpoutStats(
-                valueStat(ackedStats), valueStat(failedStats), 
valueStat(completeLatencyStats));
+                valueStat(getAcked()), valueStat(getFailed()), 
valueStat(completeLatencyStats));
         ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
 
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java 
b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index 0a1765b..5985684 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -15,30 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.task;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.ICombiner;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IReducer;
-import org.apache.storm.metric.api.ICombiner;
 import org.apache.storm.metric.api.ReducedMetric;
-import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.state.ISubscribedState;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.NotImplementedException;
 import org.json.simple.JSONValue;
 
 /**
@@ -168,9 +173,9 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
     }
 
        /**
-        * Gets the declared output fields for the specified stream id for the
-        * component this task is a part of.
-        */
+     * Gets the declared output fields for the specified stream id for the
+     * component this task is a part of.
+     */
        public Fields getThisOutputFields(String streamId) {
                return getComponentOutputFields(getThisComponentId(), streamId);
        }
@@ -202,8 +207,8 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
     public int getThisTaskIndex() {
         List<Integer> tasks = new 
ArrayList<>(getComponentTasks(getThisComponentId()));
         Collections.sort(tasks);
-        for(int i=0; i<tasks.size(); i++) {
-            if(tasks.get(i) == getThisTaskId()) {
+        for (int i=0; i<tasks.size(); i++) {
+            if (tasks.get(i) == getThisTaskId()) {
                 return i;
             }
         }
@@ -329,6 +334,7 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
      * You must call this during `IBolt.prepare()` or `ISpout.open()`.
      * @return The IMetric argument unchanged.
      */
+    @Deprecated
     public <T extends IMetric> T registerMetric(String name, T metric, int 
timeBucketSizeInSecs) {
         if(_openOrPrepareWasCalled.get()) {
             throw new RuntimeException("TopologyContext.registerMetric can 
only be called from within overridden " +
@@ -349,18 +355,18 @@ public class TopologyContext extends 
WorkerTopologyContext implements IMetricsCo
         }
 
         Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = 
_registeredMetrics;
-        if(!m1.containsKey(timeBucketSizeInSecs)) {
+        if (!m1.containsKey(timeBucketSizeInSecs)) {
             m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, 
IMetric>>());
         }
 
         Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
-        if(!m2.containsKey(_taskId)) {
+        if (!m2.containsKey(_taskId)) {
             m2.put(_taskId, new HashMap<String, IMetric>());
         }
 
         Map<String, IMetric> m3 = m2.get(_taskId);
-        if(m3.containsKey(name)) {
-            throw new RuntimeException("The same metric name `" + name + "` 
was registered twice." );
+        if (m3.containsKey(name)) {
+            throw new RuntimeException("The same metric name `" + name + "` 
was registered twice.");
         } else {
             m3.put(name, metric);
         }
@@ -375,6 +381,7 @@ public class TopologyContext extends WorkerTopologyContext 
implements IMetricsCo
      *         cause the same metric name can register twice.
      *         So we just return the first metric we meet.
      */
+    @Deprecated
     public IMetric getRegisteredMetricByName(String name) {
         IMetric metric = null;
 
@@ -395,13 +402,40 @@ public class TopologyContext extends 
WorkerTopologyContext implements IMetricsCo
     /*
      * Convenience method for registering ReducedMetric.
      */
+    @Deprecated
     public ReducedMetric registerMetric(String name, IReducer reducer, int 
timeBucketSizeInSecs) {
         return registerMetric(name, new ReducedMetric(reducer), 
timeBucketSizeInSecs);
     }
+
     /*
      * Convenience method for registering CombinedMetric.
      */
+    @Deprecated
     public CombinedMetric registerMetric(String name, ICombiner combiner, int 
timeBucketSizeInSecs) {
         return registerMetric(name, new CombinedMetric(combiner), 
timeBucketSizeInSecs);
     }
+
+    public Timer registerTimer(String name) {
+        return StormMetricRegistry.registry().timer(metricName(name));
+    }
+
+    public Histogram registerHistogram(String name) {
+        return StormMetricRegistry.registry().histogram(metricName(name));
+    }
+
+    public Meter registerMeter(String name) {
+        return StormMetricRegistry.registry().meter(metricName(name));
+    }
+
+    public Counter registerCounter(String name) {
+        return StormMetricRegistry.registry().counter(metricName(name));
+    }
+
+    public Gauge registerGauge(String name, Gauge gauge) {
+        return StormMetricRegistry.registry().register(metricName(name), 
gauge);
+    }
+
+    private String metricName(String name) {
+        return StormMetricRegistry.metricName(name, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java 
b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 7a64812..3212df0 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -18,27 +18,34 @@
 
 package org.apache.storm.utils;
 
-import org.apache.storm.policy.IWaitStrategy;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.JcMetrics;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.policy.IWaitStrategy;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpscArrayQueue;
 import org.jctools.queues.MpscUnboundedArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-
 public class JCQueue implements IStatefulObject {
     private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
+    private static final String PREFIX = "jc-";
+    private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR 
= new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat(PREFIX + 
"metrics-reporter").build());
 
     public static final Object INTERRUPT = new Object();
 
     private final ExitCondition continueRunning = () -> true;
+    private final JcMetrics jcMetrics;
 
     private interface Inserter {
         // blocking call that can be interrupted using Thread.interrupt()
@@ -64,7 +71,7 @@ public class JCQueue implements IStatefulObject {
             int idleCount = 0;
             while (!inserted) {
                 q.metrics.notifyInsertFailure();
-                if (idleCount==0) { // check avoids multiple log msgs when in 
a idle loop
+                if (idleCount == 0) { // check avoids multiple log msgs when 
in a idle loop
                     LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. 
Entering BackPressure Wait", q.getName());
                 }
 
@@ -90,7 +97,6 @@ public class JCQueue implements IStatefulObject {
 
         @Override
         public void flush() throws InterruptedException {
-            return;
         }
 
         @Override
@@ -111,7 +117,7 @@ public class JCQueue implements IStatefulObject {
             this.currentBatch = new ArrayList<>(batchSz + 1);
         }
 
-        /** Blocking call - retires till element is successfully added */
+        /** Blocking call - retires till element is successfully added. */
         @Override
         public void publish(Object obj) throws InterruptedException {
             currentBatch.add(obj);
@@ -143,7 +149,7 @@ public class JCQueue implements IStatefulObject {
             int retryCount = 0;
             while (publishCount == 0) { // retry till at least 1 element is 
drained
                 q.metrics.notifyInsertFailure();
-                if (retryCount==0) { // check avoids multiple log msgs when in 
a idle loop
+                if (retryCount == 0) { // check avoids multiple log msgs when 
in a idle loop
                     LOG.debug("Experiencing Back Pressure when flushing batch 
to Q: {}. Entering BackPressure Wait.", q.getName());
                 }
                 retryCount = q.backPressureWaitStrategy.idle(retryCount);
@@ -236,7 +242,10 @@ public class JCQueue implements IStatefulObject {
     }
 
     private final MpscArrayQueue<Object> recvQueue;
-    private final MpscUnboundedArrayQueue<Object> overflowQ; // only holds 
msgs from other workers (via WorkerTransfer), when recvQueue is full
+
+    // only holds msgs from other workers (via WorkerTransfer), when recvQueue 
is full
+    private final MpscUnboundedArrayQueue<Object> overflowQ;
+
     private final int overflowLimit; // ensures... overflowCount <= 
overflowLimit. if set to 0, disables overflow.
 
 
@@ -250,17 +259,28 @@ public class JCQueue implements IStatefulObject {
     private String queueName;
     private final IWaitStrategy backPressureWaitStrategy;
 
-    public JCQueue(String queueName, int size, int overflowLimit, int 
producerBatchSz, IWaitStrategy backPressureWaitStrategy) {
+    public JCQueue(String queueName, int size, int overflowLimit, int 
producerBatchSz, IWaitStrategy backPressureWaitStrategy,
+                   String topologyId, String componentId, Integer taskId, int 
port) {
         this.queueName = queueName;
         this.overflowLimit = overflowLimit;
         this.recvQueue = new MpscArrayQueue<>(size);
         this.overflowQ = new MpscUnboundedArrayQueue<>(size);
 
         this.metrics = new JCQueue.QueueMetrics();
+        this.jcMetrics = StormMetricRegistry.jcMetrics(queueName, topologyId, 
componentId, taskId, port);
 
         //The batch size can be no larger than half the full recvQueue size, 
to avoid contention issues.
         this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 
2));
         this.backPressureWaitStrategy = backPressureWaitStrategy;
+
+        if (!METRICS_REPORTER_EXECUTOR.isShutdown()) {
+            METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    jcMetrics.set(metrics);
+                }
+            }, 15, 15, TimeUnit.SECONDS);
+        }
     }
 
     public String getName() {
@@ -271,6 +291,7 @@ public class JCQueue implements IStatefulObject {
     public boolean haltWithInterrupt() {
         boolean res = tryPublishInternal(INTERRUPT);
         metrics.close();
+        METRICS_REPORTER_EXECUTOR.shutdown();
         return res;
     }
 
@@ -294,15 +315,18 @@ public class JCQueue implements IStatefulObject {
         }
     }
 
-    public int size() { return recvQueue.size() + overflowQ.size(); }
+    public int size() {
+        return recvQueue.size() + overflowQ.size();
+    }
 
     /**
      * Non blocking. Returns immediately if Q is empty. Returns number of 
elements consumed from Q
+     *  @param consumer
      *  @param exitCond
      */
     private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws 
InterruptedException {
         int drainCount = 0;
-        while ( exitCond.keepRunning() ) {
+        while (exitCond.keepRunning()) {
             Object tuple = recvQueue.poll();
             if (tuple == null) {
                 break;
@@ -374,7 +398,7 @@ public class JCQueue implements IStatefulObject {
     }
 
     /**
-     * Non-blocking call, returns false if full
+     * Non-blocking call, returns false if full.
      **/
     public boolean tryPublish(Object obj) {
         Inserter inserter = getInserter();
@@ -391,7 +415,7 @@ public class JCQueue implements IStatefulObject {
      * returns false if overflowLimit has reached
      */
     public boolean tryPublishToOverflow(Object obj) {
-        if (overflowLimit>0 && overflowQ.size() >= overflowLimit) {
+        if (overflowLimit > 0 && overflowQ.size() >= overflowLimit) {
             return false;
         }
         overflowQ.add(obj);

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java 
b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index 34adbc4..cb99870 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -46,9 +46,11 @@ import org.slf4j.LoggerFactory;
 public class ConfigValidation {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConfigValidation.class);
 
-    public static abstract class Validator {
+    public abstract static class Validator {
         public Validator(Map<String, Object> params) {}
+
         public Validator() {}
+
         public abstract void validateField(String name, Object o);
     }
 
@@ -57,7 +59,7 @@ public class ConfigValidation {
      */
 
     /**
-     * Validates if an object is not null
+     * Validates if an object is not null.
      */
 
     public static class NotNullValidator extends Validator {
@@ -71,7 +73,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates basic types
+     * Validates basic types.
      */
     public static class SimpleTypeValidator extends Validator {
 
@@ -93,7 +95,8 @@ public class ConfigValidation {
             if (type.isInstance(o)) {
                 return;
             }
-            throw new IllegalArgumentException("Field " + name + " must be of 
type " + type + ". Object: " + o + " actual type: " + o.getClass());
+            throw new IllegalArgumentException(
+                    "Field " + name + " must be of type " + type + ". Object: 
" + o + " actual type: " + o.getClass());
         }
     }
 
@@ -137,9 +140,10 @@ public class ConfigValidation {
 
         public StringValidator(Map<String, Object> params) {
 
-            this.acceptedValues = new 
HashSet<String>(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
+            this.acceptedValues =
+                    new 
HashSet<String>(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
 
-            if(this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 
1 && this.acceptedValues.contains(""))) {
+            if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() 
== 1 && this.acceptedValues.contains(""))) {
                 this.acceptedValues = null;
             }
         }
@@ -147,7 +151,7 @@ public class ConfigValidation {
         @Override
         public void validateField(String name, Object o) {
             SimpleTypeValidator.validateField(name, String.class, o);
-            if(this.acceptedValues != null) {
+            if (this.acceptedValues != null) {
                 if (!this.acceptedValues.contains((String) o)) {
                     throw new IllegalArgumentException("Field " + name + " is 
not an accepted value. Value: " + o + " Accepted values: " + 
this.acceptedValues);
                 }
@@ -194,8 +198,8 @@ public class ConfigValidation {
                 return;
             }
             final long i;
-            if (o instanceof Number &&
-                    (i = ((Number) o).longValue()) == ((Number) 
o).doubleValue()) {
+            if (o instanceof Number
+                    && (i = ((Number) o).longValue()) == ((Number) 
o).doubleValue()) {
                 if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
                     return;
                 }
@@ -205,7 +209,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates an entry for ImpersonationAclUser
+     * Validates an entry for ImpersonationAclUser.
      */
     public static class ImpersonationAclUserEntryValidator extends Validator {
 
@@ -214,7 +218,8 @@ public class ConfigValidation {
             if (o == null) {
                 return;
             }
-            ConfigValidationUtils.NestableFieldValidator validator = 
ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
+            ConfigValidationUtils.NestableFieldValidator validator =
+                    
ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
                     ConfigValidationUtils.listFv(String.class, false), false);
             validator.validateField(name, o);
             @SuppressWarnings("unchecked")
@@ -229,7 +234,7 @@ public class ConfigValidation {
     }
 
     /**
-     * validates a list of has no duplicates
+     * validates a list of has no duplicates.
      */
     public static class NoDuplicateInListValidator extends Validator {
 
@@ -251,7 +256,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates a String or a list of Strings
+     * Validates a String or a list of Strings.
      */
     public static class StringOrStringListValidator extends Validator {
 
@@ -269,7 +274,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates Kryo Registration
+     * Validates Kryo Registration.
      */
     public static class KryoRegValidator extends Validator {
 
@@ -283,8 +288,8 @@ public class ConfigValidation {
                 for (Object e : (Iterable<?>) o) {
                     if (e instanceof Map) {
                         for (Map.Entry<Object, Object> entry : ((Map<Object, 
Object>) e).entrySet()) {
-                            if (!(entry.getKey() instanceof String) ||
-                                    !(entry.getValue() instanceof String)) {
+                            if (!(entry.getKey() instanceof String)
+                                    || !(entry.getValue() instanceof String)) {
                                 throw new IllegalArgumentException(
                                         "Each element of the list " + name + " 
must be a String or a Map of Strings");
                             }
@@ -302,7 +307,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates if a number is a power of 2
+     * Validates if a number is a power of 2.
      */
     public static class PowerOf2Validator extends Validator {
 
@@ -312,8 +317,8 @@ public class ConfigValidation {
                 return;
             }
             final long i;
-            if (o instanceof Number &&
-                    (i = ((Number) o).longValue()) == ((Number) 
o).doubleValue()) {
+            if (o instanceof Number
+                    && (i = ((Number) o).longValue()) == ((Number) 
o).doubleValue()) {
                 // Test whether the integer is a power of 2.
                 if (i > 0 && (i & (i - 1)) == 0) {
                     return;
@@ -324,7 +329,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates each entry in a list
+     * Validates each entry in a list.
      */
     public static class ListEntryTypeValidator extends Validator {
 
@@ -346,10 +351,10 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates each entry in a list against a list of custom Validators
+     * Validates each entry in a list against a list of custom Validators.
      * Each validator in the list of validators must inherit or be an instance 
of Validator class
      */
-    public static class ListEntryCustomValidator extends Validator{
+    public static class ListEntryCustomValidator extends Validator {
 
         private Class<?>[] entryValidators;
 
@@ -366,7 +371,8 @@ public class ConfigValidation {
             }
         }
 
-        public static void validateField(String name, Class<?>[] validators, 
Object o) throws IllegalAccessException, InstantiationException, 
NoSuchMethodException, InvocationTargetException {
+        public static void validateField(String name, Class<?>[] validators, 
Object o)
+                throws IllegalAccessException, InstantiationException, 
NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
@@ -386,9 +392,9 @@ public class ConfigValidation {
     }
 
     /**
-     * validates each key and value in a map of a certain type
+     * validates each key and value in a map of a certain type.
      */
-    public static class MapEntryTypeValidator extends Validator{
+    public static class MapEntryTypeValidator extends Validator {
 
         private Class<?> keyType;
         private Class<?> valueType;
@@ -410,9 +416,9 @@ public class ConfigValidation {
     }
 
     /**
-     * validates each key and each value against the respective arrays of 
validators
+     * validates each key and each value against the respective arrays of 
validators.
      */
-    public static class MapEntryCustomValidator extends Validator{
+    public static class MapEntryCustomValidator extends Validator {
 
         private Class<?>[] keyValidators;
         private Class<?>[] valueValidators;
@@ -432,7 +438,8 @@ public class ConfigValidation {
         }
 
         @SuppressWarnings("unchecked")
-        public static void validateField(String name, Class<?>[] 
keyValidators, Class<?>[] valueValidators, Object o) throws 
IllegalAccessException, InstantiationException, NoSuchMethodException, 
InvocationTargetException {
+        public static void validateField(String name, Class<?>[] 
keyValidators, Class<?>[] valueValidators, Object o)
+                throws IllegalAccessException, InstantiationException, 
NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
@@ -460,9 +467,9 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates a positive number
+     * Validates a positive number.
      */
-    public static class PositiveNumberValidator extends Validator{
+    public static class PositiveNumberValidator extends Validator {
 
         private boolean includeZero;
 
@@ -484,7 +491,7 @@ public class ConfigValidation {
                 return;
             }
             if (o instanceof Number) {
-                if(includeZero) {
+                if (includeZero) {
                     if (((Number) o).doubleValue() >= 0.0) {
                         return;
                     }
@@ -518,14 +525,14 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            if(o == null) {
+            if (o == null) {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if(!((Map<?, ?>) o).containsKey("class") ) {
-                throw new IllegalArgumentException( "Field " + name + " must 
have map entry with key: class");
+            if (!((Map<?, ?>) o).containsKey("class")) {
+                throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: class");
             }
-            if(!((Map<?, ?>) o).containsKey("parallelism.hint") ) {
+            if (!((Map<?, ?>) o).containsKey("parallelism.hint")) {
                 throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: parallelism.hint");
             }
 
@@ -534,21 +541,65 @@ public class ConfigValidation {
         }
     }
 
+    public static class MetricReportersValidator extends Validator {
+        private static final String NIMBUS = "nimbus";
+        private static final String SUPERVISOR = "supervisor";
+        private static final String WORKER = "worker";
+        private static final String CLASS = "class";
+        private static final String FILTER = "filter";
+        private static final String DAEMONS = "daemons";
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, Map.class, o);
+            if (!((Map) o).containsKey(CLASS)) {
+                throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: class");
+            }
+            if (!((Map) o).containsKey(DAEMONS)) {
+                throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: daemons");
+            } else {
+                // daemons can only be 'nimbus', 'supervisor', or 'worker'
+                Object list = ((Map)o).get(DAEMONS);
+                if (!(list instanceof List)) {
+                    throw new IllegalArgumentException("Field 'daemons' must 
be a non-null list.");
+                }
+                List daemonList = (List)list;
+                for (Object string : daemonList) {
+                    if (string instanceof String
+                            && (string.equals(NIMBUS) || 
string.equals(SUPERVISOR) || string.equals(WORKER))) {
+                        continue;
+                    }
+                    throw new IllegalArgumentException("Field 'daemons' must 
contain at least one of the following:"
+                            + " \"nimbus\", \"supervisor\", or \"worker\"");
+                }
+
+            }
+            if (((Map)o).containsKey(FILTER)) {
+                Map filterMap = (Map)((Map)o).get(FILTER);
+                SimpleTypeValidator.validateField(CLASS, String.class, 
filterMap.get(CLASS));
+            }
+            SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get(CLASS));
+        }
+    }
+
     public static class EventLoggerRegistryValidator extends Validator {
 
         @Override
         public void validateField(String name, Object o) {
-            if(o == null) {
+            if (o == null) {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if(!((Map<?, ?>) o).containsKey("class") ) {
-                throw new IllegalArgumentException( "Field " + name + " must 
have map entry with key: class");
+            if (!((Map<?, ?>) o).containsKey("class")) {
+                throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: class");
             }
 
             SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) 
o).get("class"));
 
-            if(((Map<?, ?>) o).containsKey("arguments") ) {
+            if (((Map<?, ?>) o).containsKey("arguments")) {
                 SimpleTypeValidator.validateField(name, Map.class, ((Map<?, 
?>) o).get("arguments"));
             }
         }
@@ -570,10 +621,9 @@ public class ConfigValidation {
                 throw new IllegalArgumentException("Field " + name + " must be 
set.");
             }
 
-            if (o instanceof String &&
-                    (((String) o).equals("NONE") ||
-                            ((String) o).equals("DIGEST") ||
-                            ((String) o).equals("KERBEROS"))) {
+            if (o instanceof String
+                    && (((String) o).equals("NONE") || ((String) 
o).equals("DIGEST")
+                    || ((String) o).equals("KERBEROS"))) {
                 return;
             }
             throw new IllegalArgumentException("Field " + name + " must be one 
of \"NONE\", \"DIGEST\", or \"KERBEROS\"");
@@ -684,9 +734,9 @@ public class ConfigValidation {
 
     private static List<Class<?>> configClasses = null;
     //We follow the model of service loaders (Even though it is not a service).
-    private static final String CONFIG_CLASSES_NAME = 
"META-INF/services/"+Validated.class.getName();
+    private static final String CONFIG_CLASSES_NAME = "META-INF/services/" + 
Validated.class.getName();
     
-    private synchronized static List<Class<?>> getConfigClasses() {
+    private static synchronized List<Class<?>> getConfigClasses() {
         if (configClasses == null) {
             List<Class<?>> ret = new ArrayList<>();
             Set<String> classesToScan = new HashSet<>();
@@ -695,7 +745,7 @@ public class ConfigValidation {
                 try {
                     try (BufferedReader in = new BufferedReader(new 
InputStreamReader(url.openStream()))) {
                         String line;
-                        while((line = in.readLine()) != null) {
+                        while ((line = in.readLine()) != null) {
                             line = line.replaceAll("#.*$", "").trim();
                             if (!line.isEmpty()) {
                                 classesToScan.add(line);
@@ -720,7 +770,7 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates a field given field name as string
+     * Validates a field given field name as string.
      *
      * @param fieldName   provided as a string
      * @param conf        map of confs
@@ -783,9 +833,7 @@ public class ConfigValidation {
                     //If validator has a constructor that takes a Map as an 
argument call that constructor
                     if (hasConstructor(clazz, Map.class)) {
                         o = 
clazz.getConstructor(Map.class).newInstance(params);
-                    }
-                    //If not call default constructor
-                    else {
+                    } else { //If not call default constructor
                         o = clazz.newInstance();
                     }
                     o.validateField(field.getName(), conf.get(key));
@@ -797,7 +845,7 @@ public class ConfigValidation {
     }
     
     /**
-     * Validate all confs in map
+     * Validate all confs in map.
      *
      * @param conf map of configs
      */
@@ -811,15 +859,15 @@ public class ConfigValidation {
     private static final int ACC_STATIC = 0x0008;
     private static final int ACC_FINAL  = 0x0010;
     private static final int DESIRED_FIELD_ACC = ACC_PUBLIC | ACC_STATIC | 
ACC_FINAL;
+
     public static boolean isFieldAllowed(Field field) {
-        return field.getAnnotation(NotConf.class) == null &&
-                String.class.equals(field.getType()) &&
-                ((field.getModifiers() & DESIRED_FIELD_ACC) == 
DESIRED_FIELD_ACC) &&
-                !field.isSynthetic();
+        return field.getAnnotation(NotConf.class) == null
+                && String.class.equals(field.getType())
+                && ((field.getModifiers() & DESIRED_FIELD_ACC) == 
DESIRED_FIELD_ACC) && !field.isSynthetic();
     }
     
     /**
-     * Validate all confs in map
+     * Validate all confs in map.
      *
      * @param conf        map of configs
      * @param classes config class
@@ -848,9 +896,10 @@ public class ConfigValidation {
         }
     }
 
-    private static Map<String,Object> getParamsFromAnnotation(Class<?> 
validatorClass, Object v) throws InvocationTargetException, 
IllegalAccessException {
+    private static Map<String,Object> getParamsFromAnnotation(Class<?> 
validatorClass, Object v)
+            throws InvocationTargetException, IllegalAccessException {
         Map<String, Object> params = new HashMap<String, Object>();
-        for(Method method : validatorClass.getDeclaredMethods()) {
+        for (Method method : validatorClass.getDeclaredMethods()) {
 
             Object value = null;
             try {
@@ -858,7 +907,7 @@ public class ConfigValidation {
             } catch (IllegalArgumentException ex) {
                 value = null;
             }
-            if(value != null) {
+            if (value != null) {
                 params.put(method.getName(), value);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 29ba179..e48d090 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -46,7 +46,7 @@ public class JCQueueBackpressureTest extends TestCase {
     }
 
     private static JCQueue createQueue(String name, int queueSize) {
-        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0));
+        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), 
"test", "test",1000, 1000);
     }
 
     private static class TestConsumer implements Consumer {

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index bdc4937..3d48de2 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -200,10 +200,10 @@ public class JCQueueTest {
     }
 
     private JCQueue createQueue(String name, int queueSize) {
-        return new JCQueue(name, queueSize, 0, 1, waitStrategy);
+        return createQueue(name, 1, queueSize);
     }
 
     private JCQueue createQueue(String name, int batchSize, int queueSize) {
-        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy);
+        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, 
"test", "test",1000, 1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index ec1815d..c6cff6b 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -18,19 +18,22 @@
 
 package org.apache.storm;
 
+import static 
org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import static 
org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
 import static org.apache.storm.validation.ConfigValidationAnnotations.isString;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isStringList;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList;
-import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
-import static 
org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
-import static 
org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
 import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom;
+import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.Map;
 
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
@@ -42,9 +45,6 @@ import org.apache.storm.security.auth.IAuthorizer;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.validation.Validated;
 
-import java.util.ArrayList;
-import java.util.Map;
-
 /**
  * Storm configs are specified as a plain old map. This class provides 
constants for
  * all the configurations possible on a Storm cluster. Each constant is paired 
with an annotation
@@ -69,32 +69,12 @@ public class DaemonConfig implements Validated {
     public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = 
"storm.daemon.metrics.reporter.plugins";
 
     /**
-     * A specify Locale for daemon metrics reporter plugin.
-     * Use the specified IETF BCP 47 language tag string for a Locale.
-     */
-    @isString
-    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = 
"storm.daemon.metrics.reporter.plugin.locale";
-
-    /**
      * A specify domain for daemon metrics reporter plugin to limit reporting 
to specific domain.
      */
     @isString
     public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = 
"storm.daemon.metrics.reporter.plugin.domain";
 
     /**
-     * A specify rate-unit in TimeUnit to specify reporting frequency for 
daemon metrics reporter plugin.
-     */
-    @isString
-    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT 
= "storm.daemon.metrics.reporter.plugin.rate.unit";
-
-    /**
-     * A specify duration-unit in TimeUnit to specify reporting window for 
daemon metrics reporter plugin.
-     */
-    @isString
-    public static final String 
STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = 
"storm.daemon.metrics.reporter.plugin.duration.unit";
-
-
-    /**
      * A specify csv reporter directory for CvsPreparableReporter daemon 
metrics reporter.
      */
     @isString
@@ -303,7 +283,7 @@ public class DaemonConfig implements Validated {
     public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE = 
"nimbus.assignments.service.thread.queue.size";
 
     /**
-     * class controls heartbeats recovery strategy
+     * class controls heartbeats recovery strategy.
      */
     @isString
     public static final String 
NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = 
"nimbus.worker.heartbeats.recovery.strategy.class";
@@ -829,7 +809,8 @@ public class DaemonConfig implements Validated {
      */
     @NotNull
     @isPositiveNumber
-    public static final String 
STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = 
"storm.cluster.metrics.consumer.publish.interval.secs";
+    public static final String 
STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS =
+            "storm.cluster.metrics.consumer.publish.interval.secs";
 
     /**
      * Enables user-first classpath. See topology.classpath.beginning.
@@ -863,8 +844,8 @@ public class DaemonConfig implements Validated {
 
     /**
      * For ArtifactoryConfigLoader, this can either be a reference to an 
individual file in Artifactory or to a directory.
-     * If it is a directory, the file with the largest lexographic name will 
be returned. Users need to add "artifactory+" to the beginning of
-     * the real URI to use ArtifactoryConfigLoader.
+     * If it is a directory, the file with the largest lexographic name will 
be returned. Users need to add "artifactory+"
+     * to the beginning of the real URI to use ArtifactoryConfigLoader.
      * For FileConfigLoader, this is the URI pointing to a file.
      */
     @isString

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
index b5bd8bc..85ee27f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -15,8 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.metrics;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
@@ -26,15 +32,8 @@ import org.apache.storm.utils.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 public class MetricsUtils {
-    private final static Logger LOG = 
LoggerFactory.getLogger(MetricsUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientMetricsUtils.class);
 
     public static List<PreparableReporter> getPreparableReporters(Map<String, 
Object> topoConf) {
         List<String> clazzes = (List<String>) 
topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
@@ -60,30 +59,6 @@ public class MetricsUtils {
         return reporter;
     }
 
-    public static Locale getMetricsReporterLocale(Map<String, Object> 
topoConf) {
-        String languageTag = 
ObjectReader.getString(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE),
 null);
-        if (languageTag != null) {
-            return Locale.forLanguageTag(languageTag);
-        }
-        return null;
-    }
-
-    public static TimeUnit getMetricsRateUnit(Map<String, Object> topoConf) {
-        return getTimeUnitForCofig(topoConf, 
DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT);
-    }
-
-    public static TimeUnit getMetricsDurationUnit(Map<String, Object> 
topoConf) {
-        return getTimeUnitForCofig(topoConf, 
DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT);
-    }
-
-    private static TimeUnit getTimeUnitForCofig(Map<String, Object> topoConf, 
String configName) {
-        String rateUnitString = 
ObjectReader.getString(topoConf.get(configName), null);
-        if (rateUnitString != null) {
-            return TimeUnit.valueOf(rateUnitString);
-        }
-        return null;
-    }
-
     public static File getCsvLogDir(Map<String, Object> topoConf) {
         String csvMetricsLogDirectory = 
ObjectReader.getString(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR),
 null);
         if (csvMetricsLogDirectory == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index bc34b4b..cd6d069 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -15,20 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.ConsoleReporter;
 import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsolePreparableReporter implements 
PreparableReporter<ConsoleReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(ConsolePreparableReporter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConsolePreparableReporter.class);
     ConsoleReporter reporter = null;
 
     @Override
@@ -37,17 +37,17 @@ public class ConsolePreparableReporter implements 
PreparableReporter<ConsoleRepo
         ConsoleReporter.Builder builder = 
ConsoleReporter.forRegistry(metricsRegistry);
 
         builder.outputTo(System.out);
-        Locale locale = MetricsUtils.getMetricsReporterLocale(topoConf);
+        Locale locale = ClientMetricsUtils.getMetricsReporterLocale(topoConf);
         if (locale != null) {
             builder.formattedFor(locale);
         }
 
-        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf);
+        TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf);
         if (rateUnit != null) {
             builder.convertRatesTo(rateUnit);
         }
 
-        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(topoConf);
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(topoConf);
         if (durationUnit != null) {
             builder.convertDurationsTo(durationUnit);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
index b67cc65..3ab6a99 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -15,21 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.CsvReporter;
 import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(CsvPreparableReporter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CsvPreparableReporter.class);
     CsvReporter reporter = null;
 
     @Override
@@ -37,17 +38,17 @@ public class CsvPreparableReporter implements 
PreparableReporter<CsvReporter> {
         LOG.debug("Preparing...");
         CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
 
-        Locale locale = MetricsUtils.getMetricsReporterLocale(topoConf);
+        Locale locale = ClientMetricsUtils.getMetricsReporterLocale(topoConf);
         if (locale != null) {
             builder.formatFor(locale);
         }
 
-        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf);
+        TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf);
         if (rateUnit != null) {
             builder.convertRatesTo(rateUnit);
         }
 
-        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(topoConf);
+        TimeUnit durationUnit = 
ClientMetricsUtils.getMetricsDurationUnit(topoConf);
         if (durationUnit != null) {
             builder.convertDurationsTo(durationUnit);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
index 21aab16..c369879 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -15,21 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.ClientMetricsUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(JmxPreparableReporter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(JmxPreparableReporter.class);
     JmxReporter reporter = null;
 
     @Override
@@ -40,7 +40,7 @@ public class JmxPreparableReporter implements 
PreparableReporter<JmxReporter> {
         if (domain != null) {
             builder.inDomain(domain);
         }
-        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf);
+        TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf);
         if (rateUnit != null) {
             builder.convertRatesTo(rateUnit);
         }

Reply via email to