Repository: storm
Updated Branches:
refs/heads/1.x-branch ef7581b19 -> 1582c220a
Create stats plugin for JMX
Add config for stats reporter plugin and use it
Use Regular Map instead of Config in interface
Adding log entries for statiscs plugin actions.
Conflicts:
storm-core/src/clj/org/apache/storm/daemon/common.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68682331
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68682331
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68682331
Branch: refs/heads/1.x-branch
Commit: 68682331a219c6d2ad706729389d6eba2585c32c
Parents: ef7581b
Author: Kishor Patil <[email protected]>
Authored: Wed Feb 3 10:49:56 2016 -0600
Committer: Kishor Patil <[email protected]>
Committed: Fri Feb 5 19:28:08 2016 +0000
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 12 +++--
.../src/clj/org/apache/storm/daemon/drpc.clj | 2 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 2 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 2 +-
.../clj/org/apache/storm/daemon/supervisor.clj | 2 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 2 +-
storm-core/src/jvm/org/apache/storm/Config.java | 7 +++
.../storm/statistics/StatisticsUtils.java | 26 +++++++++++
.../reporters/JMXPreparableReporter.java | 49 ++++++++++++++++++++
.../reporters/PreparableReporter.java | 15 ++++++
10 files changed, 111 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index dd761a5..44a1d43 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -19,6 +19,9 @@
InvalidTopologyException GlobalStreamId]
[org.apache.storm.utils ThriftTopologyUtils])
(:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.statistics.reporters PreparableReporter]
+ [com.codahale.metrics MetricRegistry]
+ [org.apache.storm.statistics StatisticsUtils])
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
(:import [org.apache.storm.metric SystemBolt])
@@ -28,10 +31,13 @@
(:require [clojure.set :as set])
(:require [org.apache.storm.daemon.acker :as acker])
(:require [org.apache.storm.thrift :as thrift])
- (:require [metrics.reporters.jmx :as jmx]))
+ (:require [metrics.core :refer [default-registry]]))
-(defn start-metrics-reporters []
- (jmx/start (jmx/reporter {})))
+(defn start-metrics-reporters [conf]
+ (doto (StatisticsUtils/getPreparableReporter conf)
+ (.prepare default-registry conf)
+ (.start))
+ (log-message "Started statistics report plugin..."))
(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index d6f77c3..c2fadc6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -265,7 +265,7 @@
https-need-client-auth
https-want-client-auth)
(config-filter server app filters-confs))})))
- (start-metrics-reporters)
+ (start-metrics-reporters conf)
(when handler-server
(.serve handler-server)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 66a1899..1fcb5d5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -1198,4 +1198,4 @@
STORM-VERSION
"'")
(start-logviewer! conf log-root daemonlog-root)
- (start-metrics-reporters)))
+ (start-metrics-reporters conf)))
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 587da65..9376d6e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1450,7 +1450,7 @@
(defgauge nimbus:num-supervisors
(fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
- (start-metrics-reporters)
+ (start-metrics-reporters conf)
(reify Nimbus$Iface
(^void submitTopologyWithOpts
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 4bca23e..d27a609 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -1196,7 +1196,7 @@
(let [supervisor (mk-supervisor conf nil supervisor)]
(add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))
(defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
- (start-metrics-reporters)))
+ (start-metrics-reporters conf)))
(defn standalone-supervisor []
(let [conf-atom (atom nil)
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj
b/storm-core/src/clj/org/apache/storm/ui/core.clj
index cbea15a..bd1dba5 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -1260,7 +1260,7 @@
https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
- (start-metrics-reporters)
+ (start-metrics-reporters conf)
(storm-run-jetty {:port (conf UI-PORT)
:host (conf UI-HOST)
:https-port https-port
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java
b/storm-core/src/jvm/org/apache/storm/Config.java
index f7f5169..bf50223 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,6 +140,13 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_META_SERIALIZATION_DELEGATE =
"storm.meta.serialization.delegate";
/**
+ * A list of statistics preparable reporter class names.
+ */
+ @NotNull
+ @isImplementationOfClass(implementsClass =
org.apache.storm.statistics.reporters.PreparableReporter.class)
+ public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN =
"storm.statistics.preparable.reporter.plugin";
+
+ /**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
@isStringList
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git
a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
new file mode 100644
index 0000000..19f7690
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
@@ -0,0 +1,26 @@
+package org.apache.storm.statistics;
+
+import org.apache.storm.Config;
+import org.apache.storm.statistics.reporters.JMXPreparableReporter;
+import org.apache.storm.statistics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class StatisticsUtils {
+ private final static Logger LOG =
LoggerFactory.getLogger(StatisticsUtils.class);
+
+ public static PreparableReporter getPreparableReporter(Map stormConf) {
+ PreparableReporter reporter = new JMXPreparableReporter();
+ String clazz = (String)
stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+ LOG.info("Using statistics reporter plugin:" + clazz);
+ if(clazz != null) {
+ reporter = (PreparableReporter) Utils.newInstance(clazz);
+ } else {
+ reporter = new JMXPreparableReporter();
+ }
+ return reporter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
----------------------------------------------------------------------
diff --git
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
new file mode 100644
index 0000000..5d94ffc
--- /dev/null
+++
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
@@ -0,0 +1,49 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JMXPreparableReporter implements PreparableReporter<JmxReporter> {
+ private final static Logger LOG =
LoggerFactory.getLogger(JMXPreparableReporter.class);
+
+ JmxReporter reporter = null;
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+ LOG.info("Preparing...");
+ JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+ String domain = Utils.getString(stormConf.get(":domain"), null);
+ if (domain != null) {
+ builder.inDomain(domain);
+ }
+ String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+ if (rateUnit != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+ }
+ MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+ if (filter != null) {
+ builder.filter(filter);
+ }
+ reporter = builder.build();
+
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting...");
+ reporter.start();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping...");
+ reporter.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git
a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
new file mode 100644
index 0000000..f6e8b2b
--- /dev/null
+++
b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
@@ -0,0 +1,15 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.io.Closeable;
+import java.util.Map;
+
+
+public interface PreparableReporter<T extends Reporter & Closeable> {
+ public abstract void prepare(MetricRegistry metricsRegistry, Map stormConf);
+ public abstract void start();
+ public abstract void stop();
+
+}