Repository: storm Updated Branches: refs/heads/master d42276f6f -> f325febd2
STORM-1723 Introduce ClusterMetricsConsumer * ClusterMetricsConsumer publishes cluster-side related metrics into consumers * like MetricsConsumer for topology metrics * Users can implement IClusterMetricsConsumer and configure to cluster conf. file to take effect * Please refer conf/storm.yaml.example for more details on configuring * Nimbus should be launched with additional jars which are needed for IClusterMetricsConsumer * Also did some refactor to nimbus.clj Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1362c0bc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1362c0bc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1362c0bc Branch: refs/heads/master Commit: 1362c0bc623118e377da57bfd6f42ce52b7b81a1 Parents: a9ef86d Author: Jungtaek Lim <[email protected]> Authored: Fri Jun 3 13:28:49 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Sun Jun 12 21:58:35 2016 +0900 ---------------------------------------------------------------------- conf/defaults.yaml | 3 + conf/storm.yaml.example | 9 + log4j2/cluster.xml | 14 ++ .../src/clj/org/apache/storm/daemon/nimbus.clj | 192 ++++++++++++------- storm-core/src/jvm/org/apache/storm/Config.java | 16 ++ .../metric/ClusterMetricsConsumerExecutor.java | 70 +++++++ .../metric/LoggingClusterMetricsConsumer.java | 86 +++++++++ .../org/apache/storm/metric/api/DataPoint.java | 43 +++++ .../metric/api/IClusterMetricsConsumer.java | 63 ++++++ .../storm/metric/api/IMetricsConsumer.java | 2 + .../storm/validation/ConfigValidation.java | 16 ++ 11 files changed, 449 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 150a236..22e0ba5 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -287,6 +287,9 @@ pacemaker.kerberos.users: [] storm.daemon.metrics.reporter.plugins: - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter" +# configuration of cluster metrics consumer +storm.cluster.metrics.consumer.publish.interval.secs: 60 + storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager" # Also determines whether the unit tests for cgroup runs. # If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/conf/storm.yaml.example ---------------------------------------------------------------------- diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 7df3e9d..17589f1 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -46,3 +46,12 @@ # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" + +## Cluster Metrics Consumers +# storm.cluster.metrics.consumer.register: +# - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer" +# - class: "org.mycompany.MyMetricsConsumer" +# argument: +# - endpoint: "metrics-collector.mycompany.org" +# +# storm.cluster.metrics.consumer.publish.interval.secs: 60 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/log4j2/cluster.xml ---------------------------------------------------------------------- diff --git a/log4j2/cluster.xml b/log4j2/cluster.xml index eddfae7..e911823 100644 --- a/log4j2/cluster.xml +++ b/log4j2/cluster.xml @@ -54,6 +54,17 @@ </Policies> <DefaultRolloverStrategy max="9"/> </RollingFile> + <RollingFile name="METRICS" + fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics" + filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz"> + <PatternLayout> + <pattern>${patternMetrics}</pattern> + </PatternLayout> + <Policies> + <SizeBasedTriggeringPolicy size="2 MB"/> + </Policies> + <DefaultRolloverStrategy max="9"/> + </RollingFile> <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514" protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" includeMDC="true" facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}" @@ -69,6 +80,9 @@ <AppenderRef ref="THRIFT-ACCESS"/> <AppenderRef ref="syslog"/> </Logger> + <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info" additivity="false"> + <appender-ref ref="METRICS"/> + </Logger> <root level="info"> <!-- We log everything --> <appender-ref ref="A1"/> <appender-ref ref="syslog"/> http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 2adadf9..ea1de6e 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -54,12 +54,16 @@ (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) (:use [org.apache.storm util config log converter]) (:require [org.apache.storm [converter :as converter]]) + (:require [org.apache.storm.ui.core :as ui]) (:require [clojure.set :as set]) (:import [org.apache.storm.daemon.common StormBase Assignment]) (:import [org.apache.storm.zookeeper Zookeeper]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm config]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) + (:import [org.apache.storm.metric ClusterMetricsConsumerExecutor] + [org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo] + [org.apache.storm Config]) (:import [org.apache.storm.utils VersionInfo LocalState] [org.json.simple JSONValue]) (:require [clj-time.core :as time]) @@ -167,6 +171,13 @@ (catch Exception e (log-warn-error e "Ingoring exception, Could not initialize " (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))))))) +(defn mk-cluster-metrics-consumer-executors [storm-conf] + (map + (fn [consumer] + (ClusterMetricsConsumerExecutor. (get consumer "class") + (get consumer "argument"))) + (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER))) + (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf @@ -209,6 +220,7 @@ :topo-history-state (ConfigUtils/nimbusTopoHistoryState conf) :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf) :nimbus-topology-action-notifier (create-tology-action-notifier conf) + :cluster-consumer-executors (mk-cluster-metrics-consumer-executors conf) })) (defn inbox [nimbus] @@ -1358,13 +1370,42 @@ (and (>= val lower) (<= val upper))) -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defserverfn service-handler [conf inimbus] - (.prepare inimbus conf (ConfigUtils/masterInimbusDir conf)) - (log-message "Starting Nimbus with conf " conf) - (let [nimbus (nimbus-data conf inimbus) - blob-store (:blob-store nimbus) - principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf) +(defn extract-cluster-metrics [^ClusterSummary summ] + (let [cluster-summ (ui/cluster-summary summ "nimbus")] + {:cluster-info (IClusterMetricsConsumer$ClusterInfo. (System/currentTimeMillis)) + :data-points (map + (fn [[k v]] (DataPoint. k v)) + (select-keys cluster-summ ["supervisors" "topologies" "slotsTotal" "slotsUsed" "slotsFree" + "executorsTotal" "tasksTotal"]))})) +(defn extract-supervisors-metrics [^ClusterSummary summ] + (let [sups (.get_supervisors summ) + supervisors-summ ((ui/supervisor-summary sups) "supervisors")] + (map (fn [supervisor-summ] + {:supervisor-info (IClusterMetricsConsumer$SupervisorInfo. + (supervisor-summ "host") + (supervisor-summ "id") + (System/currentTimeMillis)) + :data-points (map + (fn [[k v]] (DataPoint. k v)) + (select-keys supervisor-summ ["slotsTotal" "slotsUsed" "totalMem" "totalCpu" + "usedMem" "usedCpu"]))}) + supervisors-summ))) + +(defn send-cluster-metrics-to-executors [nimbus-service nimbus] + (let [cluster-summary (.getClusterInfo nimbus-service) + cluster-metrics (extract-cluster-metrics cluster-summary) + supervisors-metrics (extract-supervisors-metrics cluster-summary)] + (dofor + [consumer-executor (:cluster-consumer-executors nimbus)] + (do + (.handleDataPoints consumer-executor (:cluster-info cluster-metrics) (:data-points cluster-metrics)) + (dofor + [supervisor-metrics supervisors-metrics] + (do + (.handleDataPoints consumer-executor (:supervisor-info supervisor-metrics) (:data-points supervisor-metrics)))))))) + +(defn mk-reified-nimbus [nimbus conf blob-store] + (let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf) admin-users (or (.get conf NIMBUS-ADMINS) []) get-common-topo-info (fn [^String storm-id operation] @@ -1400,64 +1441,6 @@ (doto (ErrorInfo. (:error e) (:time-secs e)) (.set_host (:host e)) (.set_port (:port e)))))] - (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf) - - ;add to nimbuses - (.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus)) - (NimbusSummary. - (.getHost (:nimbus-host-port-info nimbus)) - (.getPort (:nimbus-host-port-info nimbus)) - (Time/currentTimeSecs) - false ;is-leader - STORM-VERSION)) - - (.addToLeaderLockQueue (:leader-elector nimbus)) - (cleanup-corrupt-topologies! nimbus) - (when (instance? LocalFsBlobStore blob-store) - ;register call back for blob-store - (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus))) - (setup-blobstore nimbus)) - - (when (is-leader nimbus :throw-exception false) - (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))] - (transition! nimbus storm-id :startup))) - - (.scheduleRecurring (:timer nimbus) - 0 - (conf NIMBUS-MONITOR-FREQ-SECS) - (fn [] - (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN) - (locking (:submit-lock nimbus) - (mk-assignments nimbus))) - (do-cleanup nimbus))) - ;; Schedule Nimbus inbox cleaner - (.scheduleRecurring (:timer nimbus) - 0 - (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) - (fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))) - ;; Schedule nimbus code sync thread to sync code from other nimbuses. - (if (instance? LocalFsBlobStore blob-store) - (.scheduleRecurring (:timer nimbus) - 0 - (conf NIMBUS-CODE-SYNC-FREQ-SECS) - (fn [] (blob-sync conf nimbus)))) - ;; Schedule topology history cleaner - (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)] - (.scheduleRecurring (:timer nimbus) - 0 - (conf LOGVIEWER-CLEANUP-INTERVAL-SECS) - (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))) - (.scheduleRecurring (:timer nimbus) - 0 - (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS) - (fn [] - (renew-credentials nimbus))) - - (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge "nimbus:num-supervisors" - (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))) - - (StormMetricsRegistry/startMetricsReporters conf) - (reify Nimbus$Iface (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology @@ -2195,6 +2178,85 @@ (isWaiting [this] (.isTimerWaiting (:timer nimbus)))))) +;TODO: when translating this function, you should replace the map-val with a proper for loop HERE +(defserverfn service-handler [conf inimbus] + (.prepare inimbus conf (ConfigUtils/masterInimbusDir conf)) + (log-message "Starting Nimbus with conf " conf) + (let [nimbus (nimbus-data conf inimbus) + blob-store (:blob-store nimbus)] + (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf) + + ;add to nimbuses + (.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus)) + (NimbusSummary. + (.getHost (:nimbus-host-port-info nimbus)) + (.getPort (:nimbus-host-port-info nimbus)) + (Time/currentTimeSecs) + false ;is-leader + STORM-VERSION)) + + (.addToLeaderLockQueue (:leader-elector nimbus)) + (cleanup-corrupt-topologies! nimbus) + (when (instance? LocalFsBlobStore blob-store) + ;register call back for blob-store + (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus))) + (setup-blobstore nimbus)) + + (doseq [consumer (:cluster-consumer-executors nimbus)] + (.prepare consumer)) + + (when (is-leader nimbus :throw-exception false) + (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))] + (transition! nimbus storm-id :startup))) + + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-MONITOR-FREQ-SECS) + (fn [] + (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN) + (locking (:submit-lock nimbus) + (mk-assignments nimbus))) + (do-cleanup nimbus))) + ;; Schedule Nimbus inbox cleaner + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) + (fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))) + ;; Schedule nimbus code sync thread to sync code from other nimbuses. + (if (instance? LocalFsBlobStore blob-store) + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-CODE-SYNC-FREQ-SECS) + (fn [] (blob-sync conf nimbus)))) + ;; Schedule topology history cleaner + (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)] + (.scheduleRecurring (:timer nimbus) + 0 + (conf LOGVIEWER-CLEANUP-INTERVAL-SECS) + (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))) + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS) + (fn [] + (renew-credentials nimbus))) + + (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge "nimbus:num-supervisors" + (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))) + + (StormMetricsRegistry/startMetricsReporters conf) + + (let [reified-inimbus (mk-reified-nimbus nimbus conf blob-store)] + (do + (if (:cluster-consumer-executors nimbus) + (.scheduleRecurring (:timer nimbus) + 0 + (conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS) + (fn [] + (when (is-leader nimbus :throw-exception false) + (send-cluster-metrics-to-executors reified-inimbus nimbus)))))) + reified-inimbus))) + + (defn validate-port-available[conf] (try (let [socket (ServerSocket. (conf NIMBUS-THRIFT-PORT))] http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 73809f1..ba8b82e 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1520,6 +1520,22 @@ public class Config extends HashMap<String, Object> { public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark"; /** + * A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for exact config format). + * Each listed class will be routed cluster related metrics data. + * Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed in Nimbus. + * Only consumers which run in leader Nimbus receives metrics data. + */ + @isListEntryCustom(entryValidatorClasses = {ClusterMetricRegistryValidator.class}) + public static final String STORM_CLUSTER_METRICS_CONSUMER_REGISTER = "storm.cluster.metrics.consumer.register"; + + /** + * How often cluster metrics data is published to metrics consumer. + */ + @NotNull + @isPositiveNumber + public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs"; + + /** * A list of users that are allowed to interact with the topology. To use this set * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer */ http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java new file mode 100644 index 0000000..1102ded --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric; + +import org.apache.storm.Config; +import org.apache.storm.metric.api.DataPoint; +import org.apache.storm.metric.api.IClusterMetricsConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +public class ClusterMetricsConsumerExecutor { + public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class); + + private IClusterMetricsConsumer metricsConsumer; + private String consumerClassName; + private Object registrationArgument; + + public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) { + this.consumerClassName = consumerClassName; + this.registrationArgument = registrationArgument; + } + + public void prepare() { + try { + metricsConsumer = (IClusterMetricsConsumer)Class.forName(consumerClassName).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate a class listed in config under section " + + Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER + " with fully qualified name " + consumerClassName, e); + } + + metricsConsumer.prepare(registrationArgument); + } + + public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) { + try { + metricsConsumer.handleDataPoints(clusterInfo, dataPoints); + } catch (Throwable e) { + LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); + } + } + + public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) { + try { + metricsConsumer.handleDataPoints(supervisorInfo, dataPoints); + } catch (Throwable e) { + LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); + } + } + + public void cleanup() { + metricsConsumer.cleanup(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java new file mode 100644 index 0000000..ac2b743 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric; + +import org.apache.storm.metric.api.DataPoint; +import org.apache.storm.metric.api.IClusterMetricsConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +/** + * Listens for cluster related metrics, dumps them to log + * + * To use, edit the storm.yaml config file: + * + * ```yaml + * storm.cluster.metrics.register: + * - class: "org.apache.storm.metrics.LoggingClusterMetricsConsumer" + * ``` + * + */ +public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { + public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class); + + static private String padding = " "; + + @Override + public void prepare(Object registrationArgument) { + } + + @Override + public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { + StringBuilder sb = new StringBuilder(); + String header = String.format("%d\t%15s\t%40s\t", + clusterInfo.getTimestamp(), "<cluster>", "<cluster>"); + sb.append(header); + logDataPoints(dataPoints, sb, header); + } + + @Override + public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { + StringBuilder sb = new StringBuilder(); + String header = String.format("%d\t%15s\t%40s\t", + supervisorInfo.getTimestamp(), + supervisorInfo.getSrcSupervisorHost(), + supervisorInfo.getSrcSupervisorId()); + sb.append(header); + for (DataPoint p : dataPoints) { + sb.delete(header.length(), sb.length()); + sb.append(p.getName()) + .append(padding).delete(header.length()+23,sb.length()).append("\t") + .append(p.getValue()); + LOG.info(sb.toString()); + } + } + + @Override + public void cleanup() { + } + + private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) { + for (DataPoint p : dataPoints) { + sb.delete(header.length(), sb.length()); + sb.append(p.getName()) + .append(padding).delete(header.length()+23,sb.length()).append("\t") + .append(p.getValue()); + LOG.info(sb.toString()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java new file mode 100644 index 0000000..45e077a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.api; + +public class DataPoint { + private String name; + private Object value; + + public DataPoint() {} + + public DataPoint(String name, Object value) { + this.name = name; + this.value = value; + } + + @Override + public String toString() { + return "[" + name + " = " + value + "]"; + } + + public String getName() { + return name; + } + + public Object getValue() { + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java new file mode 100644 index 0000000..d18aa04 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.api; + +import java.util.Collection; + +public interface IClusterMetricsConsumer { + class ClusterInfo { + private long timestamp; + + public ClusterInfo(long timestamp) { + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + } + + class SupervisorInfo { + private String srcSupervisorHost; + private String srcSupervisorId; + private long timestamp; + + public SupervisorInfo(String srcSupervisorHost, String srcSupervisorId, long timestamp) { + this.srcSupervisorHost = srcSupervisorHost; + this.srcSupervisorId = srcSupervisorId; + this.timestamp = timestamp; + } + + public String getSrcSupervisorHost() { + return srcSupervisorHost; + } + + public String getSrcSupervisorId() { + return srcSupervisorId; + } + + public long getTimestamp() { + return timestamp; + } + } + + void prepare(Object registrationArgument); + void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints); + void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints); + void cleanup(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java index f475659..fe9b7f9 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java +++ b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java @@ -40,6 +40,8 @@ public interface IMetricsConsumer { public long timestamp; public int updateIntervalSecs; } + + // We can't move this to outside without breaking backward compatibility. public static class DataPoint { public DataPoint() {} public DataPoint(String name, Object value) { http://git-wip-us.apache.org/repos/asf/storm/blob/1362c0bc/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java index edff5cf..f4500b4 100644 --- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -457,6 +457,22 @@ public class ConfigValidation { } } + public static class ClusterMetricRegistryValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + SimpleTypeValidator.validateField(name, Map.class, o); + if (!((Map) o).containsKey("class")) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); + } + + SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); + } + } + public static class MetricRegistryValidator extends Validator { @Override
