Adding Cvs and Console statistics reporter plugins

Make statistics reporter plugins a list.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea6cccfc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea6cccfc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea6cccfc

Branch: refs/heads/1.x-branch
Commit: ea6cccfcab257fe6854867c2b54029378953080a
Parents: 6868233
Author: Kishor Patil <[email protected]>
Authored: Wed Feb 3 13:47:09 2016 -0600
Committer: Kishor Patil <[email protected]>
Committed: Fri Feb 5 19:28:32 2016 +0000

----------------------------------------------------------------------
 conf/defaults.yaml                              |  4 +
 .../src/clj/org/apache/storm/daemon/common.clj  |  9 ++-
 storm-core/src/jvm/org/apache/storm/Config.java |  2 +-
 .../storm/statistics/StatisticsUtils.java       | 27 +++++--
 .../reporters/ConsolePreparableReporter.java    | 65 ++++++++++++++++
 .../reporters/CsvPreparableReporter.java        | 80 ++++++++++++++++++++
 .../reporters/JMXPreparableReporter.java        | 49 ------------
 .../reporters/JmxPreparableReporter.java        | 56 ++++++++++++++
 8 files changed, 234 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8873d12..b468290 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -281,3 +281,7 @@ pacemaker.thread.timeout: 10
 pacemaker.childopts: "-Xmx1024m"
 pacemaker.auth.method: "NONE"
 pacemaker.kerberos.users: []
+
+#default plugin for daemon statistics reporter
+storm.statistics.preparable.reporter.plugin:
+     - "org.apache.storm.statistics.reporters.JmxPreparableReporter"

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 44a1d43..6b7d539 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -33,12 +33,17 @@
   (:require [org.apache.storm.thrift :as thrift])
   (:require [metrics.core  :refer [default-registry]]))
 
-(defn start-metrics-reporters [conf]
-  (doto (StatisticsUtils/getPreparableReporter conf)
+(defn start-metrics-reporter [reporter conf]
+  (doto reporter
     (.prepare default-registry conf)
     (.start))
   (log-message "Started statistics report plugin..."))
 
+(defn start-metrics-reporters [conf]
+  (doseq [reporter (StatisticsUtils/getPreparableReporters conf)]
+    (start-metrics-reporter reporter conf)))
+
+
 (def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
 (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
 (def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index bf50223..9d18667 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -143,7 +143,7 @@ public class Config extends HashMap<String, Object> {
      * A list of statistics  preparable reporter class names.
      */
     @NotNull
-    @isImplementationOfClass(implementsClass = 
org.apache.storm.statistics.reporters.PreparableReporter.class)
+    @isStringList
     public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN = 
"storm.statistics.preparable.reporter.plugin";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java 
b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
index 19f7690..666e44d 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
@@ -1,25 +1,40 @@
 package org.apache.storm.statistics;
 
 import org.apache.storm.Config;
-import org.apache.storm.statistics.reporters.JMXPreparableReporter;
+import org.apache.storm.statistics.reporters.JmxPreparableReporter;
 import org.apache.storm.statistics.reporters.PreparableReporter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class StatisticsUtils {
     private final static Logger LOG = 
LoggerFactory.getLogger(StatisticsUtils.class);
 
-    public static PreparableReporter getPreparableReporter(Map stormConf) {
-        PreparableReporter reporter = new JMXPreparableReporter();
-        String clazz = (String) 
stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+    public static List<PreparableReporter> getPreparableReporters(Map 
stormConf) {
+        PreparableReporter reporter = new JmxPreparableReporter();
+        List<String> clazzes = (List<String>) 
stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+        List<PreparableReporter> reporterList = new ArrayList<>();
+
+        if (clazzes != null) {
+            for(String clazz: clazzes ) {
+                reporterList.add(getPreparableReporter(clazz));
+            }
+        }
+        if(reporterList.isEmpty()) {
+            reporterList.add(new JmxPreparableReporter());
+        }
+        return reporterList;
+    }
+
+    private static PreparableReporter getPreparableReporter(String clazz) {
+        PreparableReporter reporter = null;
         LOG.info("Using statistics reporter plugin:" + clazz);
         if(clazz != null) {
             reporter = (PreparableReporter) Utils.newInstance(clazz);
-        } else {
-            reporter = new JMXPreparableReporter();
         }
         return reporter;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
new file mode 100644
index 0000000..f545b5b
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
@@ -0,0 +1,65 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsolePreparableReporter implements 
PreparableReporter<ConsoleReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(ConsolePreparableReporter.class);
+    ConsoleReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        ConsoleReporter.Builder builder = 
ConsoleReporter.forRegistry(metricsRegistry);
+        PrintStream stream = (PrintStream)stormConf.get(":stream");
+        if (stream != null) {
+            builder.outputTo(stream);
+        }
+        Locale locale = (Locale)stormConf.get(":locale");
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), 
null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
new file mode 100644
index 0000000..610df33
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
@@ -0,0 +1,80 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(CsvPreparableReporter.class);
+    CsvReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = (Locale) stormConf.get(":locale");
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), 
null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        String localStormDirLocation = 
Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
+        File logDir = new File(localStormDirLocation + "csvmetrics" );
+        validateCreateOutputDir(logDir);
+        reporter = builder.build(logDir);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+
+
+    private void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have 
write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a 
directory.");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
deleted file mode 100644
index 5d94ffc..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class JMXPreparableReporter implements PreparableReporter<JmxReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(JMXPreparableReporter.class);
-
-    JmxReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = Utils.getString(stormConf.get(":domain"), null);
-        if (domain != null) {
-            builder.inDomain(domain);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        reporter = builder.build();
-
-    }
-
-    @Override
-    public void start() {
-        LOG.info("Starting...");
-        reporter.start();
-    }
-
-    @Override
-    public void stop() {
-        LOG.info("Stopping...");
-        reporter.stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
new file mode 100644
index 0000000..ba59611
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
@@ -0,0 +1,56 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
+    private final static Logger LOG = 
LoggerFactory.getLogger(JmxPreparableReporter.class);
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+        String domain = Utils.getString(stormConf.get(":domain"), null);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
+        }
+    }
+}

Reply via email to