Repository: storm
Updated Branches:
  refs/heads/master d42276f6f -> f325febd2


STORM-1723 Introduce ClusterMetricsConsumer

* ClusterMetricsConsumer publishes cluster-side related metrics into consumers
  * like MetricsConsumer for topology metrics
* Users can implement IClusterMetricsConsumer and configure to cluster conf. 
file to take effect
  * Please refer conf/storm.yaml.example for more details on configuring
  * Nimbus should be launched with additional jars which are needed for 
IClusterMetricsConsumer
* Also did some refactor to nimbus.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1362c0bc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1362c0bc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1362c0bc

Branch: refs/heads/master
Commit: 1362c0bc623118e377da57bfd6f42ce52b7b81a1
Parents: a9ef86d
Author: Jungtaek Lim <[email protected]>
Authored: Fri Jun 3 13:28:49 2016 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Sun Jun 12 21:58:35 2016 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |   3 +
 conf/storm.yaml.example                         |   9 +
 log4j2/cluster.xml                              |  14 ++
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 192 ++++++++++++-------
 storm-core/src/jvm/org/apache/storm/Config.java |  16 ++
 .../metric/ClusterMetricsConsumerExecutor.java  |  70 +++++++
 .../metric/LoggingClusterMetricsConsumer.java   |  86 +++++++++
 .../org/apache/storm/metric/api/DataPoint.java  |  43 +++++
 .../metric/api/IClusterMetricsConsumer.java     |  63 ++++++
 .../storm/metric/api/IMetricsConsumer.java      |   2 +
 .../storm/validation/ConfigValidation.java      |  16 ++
 11 files changed, 449 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 150a236..22e0ba5 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -287,6 +287,9 @@ pacemaker.kerberos.users: []
 storm.daemon.metrics.reporter.plugins:
      - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
 
+# configuration of cluster metrics consumer
+storm.cluster.metrics.consumer.publish.interval.secs: 60
+
 storm.resource.isolation.plugin: 
"org.apache.storm.container.cgroup.CgroupManager"
 # Also determines whether the unit tests for cgroup runs.  
 # If storm.resource.isolation.plugin.enable is set to false the unit tests for 
cgroups will not run

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 7df3e9d..17589f1 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -46,3 +46,12 @@
 #     parallelism.hint: 1
 #     argument:
 #       - endpoint: "metrics-collector.mycompany.org"
+
+## Cluster Metrics Consumers
+# storm.cluster.metrics.consumer.register:
+#   - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"
+#   - class: "org.mycompany.MyMetricsConsumer"
+#     argument:
+#       - endpoint: "metrics-collector.mycompany.org"
+#
+# storm.cluster.metrics.consumer.publish.interval.secs: 60
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/log4j2/cluster.xml
----------------------------------------------------------------------
diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml
index eddfae7..e911823 100644
--- a/log4j2/cluster.xml
+++ b/log4j2/cluster.xml
@@ -54,6 +54,17 @@
         </Policies>
         <DefaultRolloverStrategy max="9"/>
     </RollingFile>
+    <RollingFile name="METRICS"
+                 fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics"
+                 
filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz">
+        <PatternLayout>
+            <pattern>${patternMetrics}</pattern>
+        </PatternLayout>
+        <Policies>
+            <SizeBasedTriggeringPolicy size="2 MB"/>
+        </Policies>
+        <DefaultRolloverStrategy max="9"/>
+    </RollingFile>
     <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" 
port="514"
             protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" 
includeMDC="true"
             facility="LOCAL5" enterpriseNumber="18060" newLine="true" 
exceptionPattern="%rEx{full}"
@@ -69,6 +80,9 @@
         <AppenderRef ref="THRIFT-ACCESS"/>
         <AppenderRef ref="syslog"/>
     </Logger>
+    <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" 
level="info" additivity="false">
+        <appender-ref ref="METRICS"/>
+    </Logger>
     <root level="info"> <!-- We log everything -->
         <appender-ref ref="A1"/>
         <appender-ref ref="syslog"/>

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 2adadf9..ea1de6e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -54,12 +54,16 @@
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils])
   (:use [org.apache.storm util config log converter])
   (:require [org.apache.storm [converter :as converter]])
+  (:require [org.apache.storm.ui.core :as ui])
   (:require [clojure.set :as set])
   (:import [org.apache.storm.daemon.common StormBase Assignment])
   (:import [org.apache.storm.zookeeper Zookeeper])
   (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm config])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
+  (:import [org.apache.storm.metric ClusterMetricsConsumerExecutor]
+           [org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo 
DataPoint IClusterMetricsConsumer$SupervisorInfo]
+           [org.apache.storm Config])
   (:import [org.apache.storm.utils VersionInfo LocalState]
            [org.json.simple JSONValue])
   (:require [clj-time.core :as time])
@@ -167,6 +171,13 @@
         (catch Exception e
           (log-warn-error e "Ingoring exception, Could not initialize " (conf 
NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN)))))))
 
+(defn mk-cluster-metrics-consumer-executors [storm-conf]
+  (map
+    (fn [consumer]
+      (ClusterMetricsConsumerExecutor. (get consumer "class")
+                                       (get consumer "argument")))
+    (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
+
 (defn nimbus-data [conf inimbus]
   (let [forced-scheduler (.getForcedScheduler inimbus)]
     {:conf conf
@@ -209,6 +220,7 @@
      :topo-history-state (ConfigUtils/nimbusTopoHistoryState conf)
      :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
      :nimbus-topology-action-notifier (create-tology-action-notifier conf)
+     :cluster-consumer-executors (mk-cluster-metrics-consumer-executors conf)
      }))
 
 (defn inbox [nimbus]
@@ -1358,13 +1370,42 @@
   (and (>= val lower)
     (<= val upper)))
 
-;TODO: when translating this function, you should replace the map-val with a 
proper for loop HERE
-(defserverfn service-handler [conf inimbus]
-  (.prepare inimbus conf (ConfigUtils/masterInimbusDir conf))
-  (log-message "Starting Nimbus with conf " conf)
-  (let [nimbus (nimbus-data conf inimbus)
-        blob-store (:blob-store nimbus)
-        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
+(defn extract-cluster-metrics [^ClusterSummary summ]
+  (let [cluster-summ (ui/cluster-summary summ "nimbus")]
+    {:cluster-info (IClusterMetricsConsumer$ClusterInfo. 
(System/currentTimeMillis))
+     :data-points  (map
+                     (fn [[k v]] (DataPoint. k v))
+                     (select-keys cluster-summ ["supervisors" "topologies" 
"slotsTotal" "slotsUsed" "slotsFree"
+                                                "executorsTotal" 
"tasksTotal"]))}))
+(defn extract-supervisors-metrics [^ClusterSummary summ]
+  (let [sups (.get_supervisors summ)
+        supervisors-summ ((ui/supervisor-summary sups) "supervisors")]
+    (map (fn [supervisor-summ]
+           {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
+                               (supervisor-summ "host")
+                               (supervisor-summ "id")
+                               (System/currentTimeMillis))
+            :data-points     (map
+                               (fn [[k v]] (DataPoint. k v))
+                               (select-keys supervisor-summ ["slotsTotal" 
"slotsUsed" "totalMem" "totalCpu"
+                                                             "usedMem" 
"usedCpu"]))})
+         supervisors-summ)))
+
+(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
+  (let [cluster-summary (.getClusterInfo nimbus-service)
+        cluster-metrics (extract-cluster-metrics cluster-summary)
+        supervisors-metrics (extract-supervisors-metrics cluster-summary)]
+    (dofor
+      [consumer-executor (:cluster-consumer-executors nimbus)]
+      (do
+        (.handleDataPoints consumer-executor (:cluster-info cluster-metrics) 
(:data-points cluster-metrics))
+        (dofor
+          [supervisor-metrics supervisors-metrics]
+          (do
+            (.handleDataPoints consumer-executor (:supervisor-info 
supervisor-metrics) (:data-points supervisor-metrics))))))))
+
+(defn mk-reified-nimbus [nimbus conf blob-store]
+  (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
         admin-users (or (.get conf NIMBUS-ADMINS) [])
         get-common-topo-info
           (fn [^String storm-id operation]
@@ -1400,64 +1441,6 @@
                            (doto (ErrorInfo. (:error e) (:time-secs e))
                              (.set_host (:host e))
                              (.set_port (:port e)))))]
-    (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) 
conf)
-
-    ;add to nimbuses
-    (.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString 
(:nimbus-host-port-info nimbus))
-      (NimbusSummary.
-        (.getHost (:nimbus-host-port-info nimbus))
-        (.getPort (:nimbus-host-port-info nimbus))
-        (Time/currentTimeSecs)
-        false ;is-leader
-        STORM-VERSION))
-
-    (.addToLeaderLockQueue (:leader-elector nimbus))
-    (cleanup-corrupt-topologies! nimbus)
-    (when (instance? LocalFsBlobStore blob-store)
-      ;register call back for blob-store
-      (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf 
nimbus)))
-      (setup-blobstore nimbus))
-
-    (when (is-leader nimbus :throw-exception false)
-      (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))]
-        (transition! nimbus storm-id :startup)))
-
-    (.scheduleRecurring (:timer nimbus)
-      0
-      (conf NIMBUS-MONITOR-FREQ-SECS)
-      (fn []
-        (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
-          (locking (:submit-lock nimbus)
-            (mk-assignments nimbus)))
-        (do-cleanup nimbus)))
-    ;; Schedule Nimbus inbox cleaner
-    (.scheduleRecurring (:timer nimbus)
-      0
-      (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
-      (fn [] (clean-inbox (inbox nimbus) (conf 
NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
-    ;; Schedule nimbus code sync thread to sync code from other nimbuses.
-    (if (instance? LocalFsBlobStore blob-store)
-      (.scheduleRecurring (:timer nimbus)
-        0
-        (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-        (fn [] (blob-sync conf nimbus))))
-    ;; Schedule topology history cleaner
-    (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
-      (.scheduleRecurring (:timer nimbus)
-        0
-        (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
-        (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) 
nimbus))))
-    (.scheduleRecurring (:timer nimbus)
-      0
-      (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
-      (fn []
-        (renew-credentials nimbus)))
-
-    (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge 
"nimbus:num-supervisors"
-      (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))))
-
-    (StormMetricsRegistry/startMetricsReporters conf)
-    
     (reify Nimbus$Iface
       (^void submitTopologyWithOpts
         [this ^String storm-name ^String uploadedJarLocation ^String 
serializedConf ^StormTopology topology
@@ -2195,6 +2178,85 @@
       (isWaiting [this]
         (.isTimerWaiting (:timer nimbus))))))
 
+;TODO: when translating this function, you should replace the map-val with a 
proper for loop HERE
+(defserverfn service-handler [conf inimbus]
+  (.prepare inimbus conf (ConfigUtils/masterInimbusDir conf))
+  (log-message "Starting Nimbus with conf " conf)
+  (let [nimbus (nimbus-data conf inimbus)
+        blob-store (:blob-store nimbus)]
+    (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) 
conf)
+
+    ;add to nimbuses
+    (.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString 
(:nimbus-host-port-info nimbus))
+      (NimbusSummary.
+        (.getHost (:nimbus-host-port-info nimbus))
+        (.getPort (:nimbus-host-port-info nimbus))
+        (Time/currentTimeSecs)
+        false ;is-leader
+        STORM-VERSION))
+
+    (.addToLeaderLockQueue (:leader-elector nimbus))
+    (cleanup-corrupt-topologies! nimbus)
+    (when (instance? LocalFsBlobStore blob-store)
+      ;register call back for blob-store
+      (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf 
nimbus)))
+      (setup-blobstore nimbus))
+
+    (doseq [consumer (:cluster-consumer-executors nimbus)]
+      (.prepare consumer))
+
+    (when (is-leader nimbus :throw-exception false)
+      (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))]
+        (transition! nimbus storm-id :startup)))
+
+    (.scheduleRecurring (:timer nimbus)
+      0
+      (conf NIMBUS-MONITOR-FREQ-SECS)
+      (fn []
+        (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+          (locking (:submit-lock nimbus)
+            (mk-assignments nimbus)))
+        (do-cleanup nimbus)))
+    ;; Schedule Nimbus inbox cleaner
+    (.scheduleRecurring (:timer nimbus)
+      0
+      (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
+      (fn [] (clean-inbox (inbox nimbus) (conf 
NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+    ;; Schedule nimbus code sync thread to sync code from other nimbuses.
+    (if (instance? LocalFsBlobStore blob-store)
+      (.scheduleRecurring (:timer nimbus)
+        0
+        (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+        (fn [] (blob-sync conf nimbus))))
+    ;; Schedule topology history cleaner
+    (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
+      (.scheduleRecurring (:timer nimbus)
+        0
+        (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
+        (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) 
nimbus))))
+    (.scheduleRecurring (:timer nimbus)
+      0
+      (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+      (fn []
+        (renew-credentials nimbus)))
+
+    (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge 
"nimbus:num-supervisors"
+      (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))))
+
+    (StormMetricsRegistry/startMetricsReporters conf)
+
+    (let [reified-inimbus (mk-reified-nimbus nimbus conf blob-store)]
+      (do
+        (if (:cluster-consumer-executors nimbus)
+          (.scheduleRecurring (:timer nimbus)
+            0
+            (conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS)
+            (fn []
+              (when (is-leader nimbus :throw-exception false)
+                (send-cluster-metrics-to-executors reified-inimbus nimbus))))))
+      reified-inimbus)))
+
+
 (defn validate-port-available[conf]
   (try
     (let [socket (ServerSocket. (conf NIMBUS-THRIFT-PORT))]

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 73809f1..ba8b82e 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1520,6 +1520,22 @@ public class Config extends HashMap<String, Object> {
     public static final String 
BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
 
     /**
+     * A list of classes implementing IClusterMetricsConsumer (See 
storm.yaml.example for exact config format).
+     * Each listed class will be routed cluster related metrics data.
+     * Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and 
they're executed in Nimbus.
+     * Only consumers which run in leader Nimbus receives metrics data.
+     */
+    @isListEntryCustom(entryValidatorClasses = 
{ClusterMetricRegistryValidator.class})
+    public static final String STORM_CLUSTER_METRICS_CONSUMER_REGISTER = 
"storm.cluster.metrics.consumer.register";
+
+    /**
+     * How often cluster metrics data is published to metrics consumer.
+     */
+    @NotNull
+    @isPositiveNumber
+    public static final String 
STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = 
"storm.cluster.metrics.consumer.publish.interval.secs";
+
+    /**
      * A list of users that are allowed to interact with the topology.  To use 
this set
      * nimbus.authorizer to 
org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
new file mode 100644
index 0000000..1102ded
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.DataPoint;
+import org.apache.storm.metric.api.IClusterMetricsConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+public class ClusterMetricsConsumerExecutor {
+    public static final Logger LOG = 
LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
+
+    private IClusterMetricsConsumer metricsConsumer;
+    private String consumerClassName;
+    private Object registrationArgument;
+
+    public ClusterMetricsConsumerExecutor(String consumerClassName, Object 
registrationArgument) {
+        this.consumerClassName = consumerClassName;
+        this.registrationArgument = registrationArgument;
+    }
+
+    public void prepare() {
+        try {
+            metricsConsumer = 
(IClusterMetricsConsumer)Class.forName(consumerClassName).newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate a class listed 
in config under section " +
+                    Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER + " with 
fully qualified name " + consumerClassName, e);
+        }
+
+        metricsConsumer.prepare(registrationArgument);
+    }
+
+    public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo 
clusterInfo, final Collection<DataPoint> dataPoints) {
+        try {
+            metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
+        } catch (Throwable e) {
+            LOG.error("Error while handling cluster data points, consumer 
class: " + consumerClassName, e);
+        }
+    }
+
+    public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo 
supervisorInfo, final Collection<DataPoint> dataPoints) {
+        try {
+            metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
+        } catch (Throwable e) {
+            LOG.error("Error while handling cluster data points, consumer 
class: " + consumerClassName, e);
+        }
+    }
+
+    public void cleanup() {
+        metricsConsumer.cleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java 
b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
new file mode 100644
index 0000000..ac2b743
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import org.apache.storm.metric.api.DataPoint;
+import org.apache.storm.metric.api.IClusterMetricsConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Listens for cluster related metrics, dumps them to log
+ *
+ * To use, edit the storm.yaml config file:
+ *
+ * ```yaml
+ *   storm.cluster.metrics.register:
+ *     - class: "org.apache.storm.metrics.LoggingClusterMetricsConsumer"
+ * ```
+ *
+ */
+public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {
+    public static final Logger LOG = 
LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);
+
+    static private String padding = "                       ";
+
+    @Override
+    public void prepare(Object registrationArgument) {
+    }
+
+    @Override
+    public void handleDataPoints(ClusterInfo clusterInfo, 
Collection<DataPoint> dataPoints) {
+        StringBuilder sb = new StringBuilder();
+        String header = String.format("%d\t%15s\t%40s\t",
+                clusterInfo.getTimestamp(), "<cluster>", "<cluster>");
+        sb.append(header);
+        logDataPoints(dataPoints, sb, header);
+    }
+
+    @Override
+    public void handleDataPoints(SupervisorInfo supervisorInfo, 
Collection<DataPoint> dataPoints) {
+        StringBuilder sb = new StringBuilder();
+        String header = String.format("%d\t%15s\t%40s\t",
+                supervisorInfo.getTimestamp(),
+                supervisorInfo.getSrcSupervisorHost(),
+                supervisorInfo.getSrcSupervisorId());
+        sb.append(header);
+        for (DataPoint p : dataPoints) {
+            sb.delete(header.length(), sb.length());
+            sb.append(p.getName())
+                    
.append(padding).delete(header.length()+23,sb.length()).append("\t")
+                    .append(p.getValue());
+            LOG.info(sb.toString());
+        }
+    }
+
+    @Override
+    public void cleanup() {
+    }
+
+    private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder 
sb, String header) {
+        for (DataPoint p : dataPoints) {
+            sb.delete(header.length(), sb.length());
+            sb.append(p.getName())
+                    
.append(padding).delete(header.length()+23,sb.length()).append("\t")
+                    .append(p.getValue());
+            LOG.info(sb.toString());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java 
b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java
new file mode 100644
index 0000000..45e077a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.api;
+
+public class DataPoint {
+    private String name;
+    private Object value;
+
+    public DataPoint() {}
+
+    public DataPoint(String name, Object value) {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + name + " = " + value + "]";
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java 
b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java
new file mode 100644
index 0000000..d18aa04
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.api;
+
+import java.util.Collection;
+
+public interface IClusterMetricsConsumer {
+    class ClusterInfo {
+        private long timestamp;
+
+        public ClusterInfo(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    class SupervisorInfo {
+        private String srcSupervisorHost;
+        private String srcSupervisorId;
+        private long timestamp;
+
+        public SupervisorInfo(String srcSupervisorHost, String 
srcSupervisorId, long timestamp) {
+            this.srcSupervisorHost = srcSupervisorHost;
+            this.srcSupervisorId = srcSupervisorId;
+            this.timestamp = timestamp;
+        }
+
+        public String getSrcSupervisorHost() {
+            return srcSupervisorHost;
+        }
+
+        public String getSrcSupervisorId() {
+            return srcSupervisorId;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    void prepare(Object registrationArgument);
+    void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> 
dataPoints);
+    void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> 
dataPoints);
+    void cleanup();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java 
b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
index f475659..fe9b7f9 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
@@ -40,6 +40,8 @@ public interface IMetricsConsumer {
         public long timestamp;
         public int updateIntervalSecs; 
     }
+
+    // We can't move this to outside without breaking backward compatibility.
     public static class DataPoint {
         public DataPoint() {}
         public DataPoint(String name, Object value) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java 
b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
index edff5cf..f4500b4 100644
--- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -457,6 +457,22 @@ public class ConfigValidation {
         }
     }
 
+    public static class ClusterMetricRegistryValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, Map.class, o);
+            if (!((Map) o).containsKey("class")) {
+                throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: class");
+            }
+
+            SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
+        }
+    }
+
     public static class MetricRegistryValidator extends Validator {
 
         @Override

Reply via email to