implement metric filters
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8de0f36 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8de0f36 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8de0f36 Branch: refs/heads/1.x-branch Commit: b8de0f365a81a8ba3eadec466d777e47dacfcb44 Parents: 6eaa1a8 Author: P. Taylor Goetz <[email protected]> Authored: Wed Aug 30 16:14:32 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Wed Aug 30 16:14:32 2017 -0400 ---------------------------------------------------------------------- conf/defaults.yaml | 46 +++++----- .../apache/storm/metrics2/Metrics2Utils.java | 28 +++++++ .../storm/metrics2/StormMetricRegistry.java | 20 ++--- .../storm/metrics2/filters/RegexFilter.java | 47 +++++++++++ .../metrics2/filters/StormMetricsFilter.java | 32 +++++++ .../reporters/ConsoleStormReporter.java | 9 +- .../metrics2/reporters/CsvStormReporter.java | 9 +- .../reporters/GangliaStormReporter.java | 8 +- .../reporters/GraphiteStormReporter.java | 8 +- .../metrics2/reporters/JmxStormReporter.java | 6 +- .../reporters/ScheduledStormReporter.java | 88 ++++++++++++++++++++ .../reporters/SheduledStormReporter.java | 71 ---------------- 12 files changed, 259 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index b01e0b7..e51b50c 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -295,26 +295,26 @@ storm.daemon.metrics.reporter.plugins: storm.cluster.metrics.consumer.publish.interval.secs: 60 -storm.metrics.reporters: - # Graphite Reporter - - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter" - daemons: - - "supervisor" - - "nimbus" - - "worker" - report.period: 60 - report.period.units: "SECONDS" - graphite.host: "localhost" - graphite.port: 2003 - - # Console Reporter - - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter" - daemons: - - "worker" - report.period: 10 - report.period.units: "SECONDS" - - #TODO: not funtional, but you get the idea - filters: - "org.apache.storm.metrics2.filters.RegexFilter": - expression: ".*my_component.*emitted.*" +# Metrics v2 configuration (optional) +#storm.metrics.reporters: +# # Graphite Reporter +# - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter" +# daemons: +# - "supervisor" +# - "nimbus" +# - "worker" +# report.period: 60 +# report.period.units: "SECONDS" +# graphite.host: "localhost" +# graphite.port: 2003 +# +# # Console Reporter +# - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter" +# daemons: +# - "worker" +# report.period: 10 +# report.period.units: "SECONDS" +# +# filter: +# class: "org.apache.storm.metrics2.filters.RegexFilter" +# expression: ".*my_component.*emitted.*" http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java b/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java new file mode 100644 index 0000000..716b8b7 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +public class Metrics2Utils { + private Metrics2Utils(){} + + public static Object instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + Class<?> c = Class.forName(klass); + return c.newInstance(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index 4c975a3..a3b0db9 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -86,12 +86,14 @@ public class StormMetricRegistry { LOG.info("Starting metrics reporters..."); List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS); - for(Map<String, Object> reporterConfig : reporterList){ - // only start those requested - List<String> daemons = (List<String>)reporterConfig.get("daemons"); - for(String daemon : daemons){ - if(DaemonType.valueOf(daemon.toUpperCase()) == type){ - startReporter(stormConfig, reporterConfig); + if(reporterList != null && reporterList.size() > 0) { + for (Map<String, Object> reporterConfig : reporterList) { + // only start those requested + List<String> daemons = (List<String>) reporterConfig.get("daemons"); + for (String daemon : daemons) { + if (DaemonType.valueOf(daemon.toUpperCase()) == type) { + startReporter(stormConfig, reporterConfig); + } } } } @@ -106,7 +108,7 @@ public class StormMetricRegistry { StormReporter reporter = null; LOG.info("Attempting to instantiate reporter class: {}", clazz); try{ - reporter = instantiate(clazz); + reporter = (StormReporter)Metrics2Utils.instantiate(clazz); } catch(Exception e){ LOG.warn("Unable to instantiate metrics reporter class: {}. Will skip this reporter.", clazz, e); } @@ -118,10 +120,6 @@ public class StormMetricRegistry { } - private static StormReporter instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException { - Class<?> c = Class.forName(klass); - return (StormReporter) c.newInstance(); - } public static void stop(){ for(StormReporter sr : REPORTERS){ http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java b/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java new file mode 100644 index 0000000..e6997c6 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.Metric; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexFilter implements StormMetricsFilter { + + private Pattern pattern; + + + @Override + public void prepare(Map<String, Object> config) { + String expression = (String) config.get("expression"); + if(expression != null){ + this.pattern = Pattern.compile(expression); + } else { + throw new IllegalStateException("RegexFilter requires an 'expression' parameter."); + } + } + + @Override + public boolean matches(String name, Metric metric) { + Matcher matcher = this.pattern.matcher(name); + return matcher.matches(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java b/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java new file mode 100644 index 0000000..57f7255 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.MetricFilter; + +import java.util.Map; + +public interface StormMetricsFilter extends MetricFilter { + + /** + * Called after the filter is instantiated. + * @param config an arbitrary configuration map pulled from the yaml configuration. + */ + void prepare(Map<String, Object> config); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java index 5322bf8..abb5226 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java @@ -20,6 +20,7 @@ package org.apache.storm.metrics2.reporters; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +28,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; -public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter> { +public class ConsoleStormReporter extends ScheduledStormReporter<ConsoleReporter> { private final static Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class); @Override @@ -51,6 +52,12 @@ public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter> builder.convertDurationsTo(durationUnit); } + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if(filter != null){ + builder.filter(filter); + } + + //defaults to 10 reportingPeriod = getReportPeriod(reporterConf); http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java index 4225b7c..24c6eed 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java @@ -20,6 +20,7 @@ package org.apache.storm.metrics2.reporters; import com.codahale.metrics.CsvReporter; import com.codahale.metrics.MetricRegistry; import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; @@ -30,7 +31,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; -public class CsvStormReporter extends SheduledStormReporter<CsvReporter> { +public class CsvStormReporter extends ScheduledStormReporter<CsvReporter> { private final static Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class); public static final String CSV_LOG_DIR = "csv.log.dir"; @@ -55,7 +56,11 @@ public class CsvStormReporter extends SheduledStormReporter<CsvReporter> { builder.convertDurationsTo(durationUnit); } - //TODO: expose some simple MetricFilters + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if(filter != null){ + builder.filter(filter); + } + //defaults to 10 reportingPeriod = getReportPeriod(reporterConf); http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java index d8d0269..e7dc5f4 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java @@ -21,6 +21,7 @@ import com.codahale.metrics.ganglia.GangliaReporter; import com.codahale.metrics.MetricRegistry; import info.ganglia.gmetric4j.gmetric.GMetric; import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +30,7 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; -public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter> { +public class GangliaStormReporter extends ScheduledStormReporter<GangliaReporter> { private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); public static final String GANGLIA_HOST = "ganglia.host"; @@ -58,7 +59,10 @@ public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter> builder.convertRatesTo(rateUnit); } - //TODO: expose some simple MetricFilters + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if(filter != null){ + builder.filter(filter); + } String prefix = getMetricsPrefixedWith(reporterConf); if (prefix != null) { builder.prefixedWith(prefix); http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java index 7a2b31b..0f88fc4 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java @@ -23,6 +23,7 @@ import com.codahale.metrics.graphite.GraphiteUDP; import com.codahale.metrics.graphite.Graphite; import com.codahale.metrics.MetricRegistry; import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.TimeUnit; -public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporter> { +public class GraphiteStormReporter extends ScheduledStormReporter<GraphiteReporter> { private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; @@ -53,7 +54,10 @@ public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporte builder.convertRatesTo(rateUnit); } - //TODO: expose some simple MetricFilters + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if(filter != null){ + builder.filter(filter); + } String prefix = getMetricsPrefixedWith(reporterConf); if (prefix != null) { builder.prefixedWith(prefix); http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java index 7ac6cde..5b932ea 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java @@ -20,6 +20,7 @@ package org.apache.storm.metrics2.reporters; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,10 @@ public class JmxStormReporter implements StormReporter<JmxReporter> { builder.inDomain(domain); } - // TODO: expose some simple MetricFilters + StormMetricsFilter filter = ScheduledStormReporter.getMetricsFilter(reporterConf); + if(filter != null){ + builder.filter(filter); + } // other builder functions not exposed: // * createsObjectNamesWith(ObjectNameFactory onFactory) // * registerWith (MBeanServer) http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java new file mode 100644 index 0000000..940cb19 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.metrics2.Metrics2Utils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduledStormReporter<T extends ScheduledReporter> implements StormReporter{ + private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); + protected ScheduledReporter reporter; + long reportingPeriod; + TimeUnit reportingPeriodUnit; + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(reportingPeriod, reportingPeriodUnit); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } + + + static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { + TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); + return unit == null ? TimeUnit.SECONDS : unit; + } + + private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { + String rateUnitString = Utils.getString(reporterConf.get(configName), null); + if (rateUnitString != null) { + return TimeUnit.valueOf(rateUnitString); + } + return null; + } + + static long getReportPeriod(Map reporterConf) { + return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); + } + + static StormMetricsFilter getMetricsFilter(Map reporterConf){ + StormMetricsFilter filter = null; + Map<String, Object> filterConf = (Map)reporterConf.get("filter"); + String clazz = (String) filterConf.get("class"); + if(filterConf != null && clazz != null){ + try { + filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); + filter.prepare(filterConf); + } catch (Exception e) { + LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); + } + } + return filter; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java deleted file mode 100644 index 1b1e7a0..0000000 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.metrics2.reporters; - -import com.codahale.metrics.ScheduledReporter; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public abstract class SheduledStormReporter<T extends ScheduledReporter> implements StormReporter{ - private static final Logger LOG = LoggerFactory.getLogger(SheduledStormReporter.class); - protected ScheduledReporter reporter; - long reportingPeriod; - TimeUnit reportingPeriodUnit; - - @Override - public void start() { - if (reporter != null) { - LOG.debug("Starting..."); - reporter.start(reportingPeriod, reportingPeriodUnit); - } else { - throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); - } - } - - @Override - public void stop() { - if (reporter != null) { - LOG.debug("Stopping..."); - reporter.stop(); - } else { - throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); - } - } - - - static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { - TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); - return unit == null ? TimeUnit.SECONDS : unit; - } - - private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { - String rateUnitString = Utils.getString(reporterConf.get(configName), null); - if (rateUnitString != null) { - return TimeUnit.valueOf(rateUnitString); - } - return null; - } - - static long getReportPeriod(Map reporterConf) { - return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); - } -}
