http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java new file mode 100644 index 0000000..bef9585 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.MetricRegistry; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsoleStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class); + + @Override + public void prepare(MetricRegistry registry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing ConsoleReporter"); + ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry); + + builder.outputTo(System.out); + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(stormConf); + if (locale != null) { + builder.formattedFor(locale); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(stormConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(stormConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + reporter = builder.build(); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java new file mode 100644 index 0000000..c52cd2c --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.MetricRegistry; +import java.io.File; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CsvStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class); + + public static final String CSV_LOG_DIR = "csv.log.dir"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); + + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(reporterConf); + if (locale != null) { + builder.formatFor(locale); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + File csvMetricsDir = getCsvLogDir(stormConf, reporterConf); + reporter = builder.build(csvMetricsDir); + } + + + private static File getCsvLogDir(Map stormConf, Map reporterConf) { + String csvMetricsLogDirectory = ObjectReader.getString(reporterConf.get(CSV_LOG_DIR), null); + if (csvMetricsLogDirectory == null) { + csvMetricsLogDirectory = ConfigUtils.absoluteStormLocalDir(stormConf); + csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics"; + } + File csvMetricsDir = new File(csvMetricsLogDirectory); + validateCreateOutputDir(csvMetricsDir); + return csvMetricsDir; + } + + private static void validateCreateOutputDir(File dir) { + if (!dir.exists()) { + dir.mkdirs(); + } + if (!dir.canWrite()) { + throw new IllegalStateException(dir.getName() + " does not have write permissions."); + } + if (!dir.isDirectory()) { + throw new IllegalStateException(dir.getName() + " is not a directory."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java new file mode 100644 index 0000000..c9f3253 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ganglia.GangliaReporter; +import info.ganglia.gmetric4j.gmetric.GMetric; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GangliaStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + + public static final String GANGLIA_HOST = "ganglia.host"; + public static final String GANGLIA_PORT = "ganglia.port"; + public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; + public static final String GANGLIA_DMAX = "ganglia.dmax"; + public static final String GANGLIA_TMAX = "ganglia.tmax"; + public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; + public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; + public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; + public static final String GANGLIA_TTL = "ganglia.ttl"; + public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + String prefix = getMetricsPrefixedWith(reporterConf); + if (prefix != null) { + builder.prefixedWith(prefix); + } + + Integer dmax = getGangliaDMax(reporterConf); + if (prefix != null) { + builder.withDMax(dmax); + } + + Integer tmax = getGangliaTMax(reporterConf); + if (prefix != null) { + builder.withTMax(tmax); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + String group = getMetricsTargetUdpGroup(reporterConf); + Integer port = getMetricsTargetPort(reporterConf); + String udpAddressingMode = getMetricsTargetUdpAddressingMode(reporterConf); + Integer ttl = getMetricsTargetTtl(reporterConf); + + GMetric.UDPAddressingMode mode = udpAddressingMode.equalsIgnoreCase("multicast") + ? GMetric.UDPAddressingMode.MULTICAST : GMetric.UDPAddressingMode.UNICAST; + + try { + GMetric sender = new GMetric(group, port, mode, ttl); + reporter = builder.build(sender); + } catch (IOException ioe) { + LOG.error("Exception in GangliaReporter config", ioe); + } + } + + + public static String getMetricsTargetUdpGroup(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GANGLIA_UDP_GROUP), null); + } + + public static String getMetricsTargetUdpAddressingMode(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), null); + } + + public static Integer getMetricsTargetTtl(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_TTL), null); + } + + public static Integer getGangliaDMax(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_DMAX), null); + } + + public static Integer getGangliaTMax(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_TMAX), null); + } + + private static Integer getMetricsTargetPort(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_PORT), null); + } + + private static String getMetricsPrefixedWith(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java new file mode 100644 index 0000000..57eb5a4 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import com.codahale.metrics.graphite.GraphiteUDP; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GraphiteStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); + + public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; + public static final String GRAPHITE_HOST = "graphite.host"; + public static final String GRAPHITE_PORT = "graphite.port"; + public static final String GRAPHITE_TRANSPORT = "graphite.transport"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + String prefix = getMetricsPrefixedWith(reporterConf); + if (prefix != null) { + builder.prefixedWith(prefix); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + // Not exposed: + // * withClock(Clock) + + String host = getMetricsTargetHost(reporterConf); + Integer port = getMetricsTargetPort(reporterConf); + String transport = getMetricsTargetTransport(reporterConf); + GraphiteSender sender = null; + if (transport.equalsIgnoreCase("udp")) { + sender = new GraphiteUDP(host, port); + } else { + sender = new Graphite(host, port); + } + reporter = builder.build(sender); + } + + private static String getMetricsPrefixedWith(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null); + } + + private static String getMetricsTargetHost(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GRAPHITE_HOST), null); + } + + private static Integer getMetricsTargetPort(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GRAPHITE_PORT), null); + } + + private static String getMetricsTargetTransport(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java new file mode 100644 index 0000000..f995f90 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmxStormReporter implements StormReporter { + private static final Logger LOG = LoggerFactory.getLogger(JmxStormReporter.class); + public static final String JMX_DOMAIN = "jmx.domain"; + JmxReporter reporter = null; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormConf, Map<String, Object> reporterConf) { + LOG.info("Preparing..."); + JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + String domain = getMetricsJmxDomain(reporterConf); + if (domain != null) { + builder.inDomain(domain); + } + + StormMetricsFilter filter = ScheduledStormReporter.getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + // other builder functions not exposed: + // * createsObjectNamesWith(ObjectNameFactory onFactory) + // * registerWith (MBeanServer) + // * specificDurationUnits (Map<String,TimeUnit> specificDurationUnits) + // * specificRateUnits(Map<String,TimeUnit> specificRateUnits) + + reporter = builder.build(); + } + + public static String getMetricsJmxDomain(Map reporterConf) { + return ObjectReader.getString(reporterConf, JMX_DOMAIN); + } + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java new file mode 100644 index 0000000..61af5be --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ScheduledStormReporter implements StormReporter { + private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); + protected ScheduledReporter reporter; + protected long reportingPeriod; + protected TimeUnit reportingPeriodUnit; + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(reportingPeriod, reportingPeriodUnit); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } + + + public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { + TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); + return unit == null ? TimeUnit.SECONDS : unit; + } + + private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { + String rateUnitString = ObjectReader.getString(reporterConf.get(configName), null); + if (rateUnitString != null) { + return TimeUnit.valueOf(rateUnitString); + } + return null; + } + + public static long getReportPeriod(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); + } + + public static StormMetricsFilter getMetricsFilter(Map reporterConf) { + StormMetricsFilter filter = null; + Map<String, Object> filterConf = (Map)reporterConf.get("filter"); + if (filterConf != null) { + String clazz = (String) filterConf.get("class"); + if (clazz != null) { + filter = ReflectionUtils.newInstance(clazz); + filter.prepare(filterConf); + } + } + return filter; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java new file mode 100644 index 0000000..907965a --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter extends Reporter { + String REPORT_PERIOD = "report.period"; + String REPORT_PERIOD_UNITS = "report.period.units"; + + void prepare(MetricRegistry metricsRegistry, Map<String, Object> conf, Map<String, Object> reporterConf); + + void start(); + + void stop(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index 78246bb..fbf1e92 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -15,44 +15,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.stats; +import com.codahale.metrics.Counter; import com.google.common.collect.Lists; - +import java.util.List; import org.apache.storm.generated.BoltStats; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; -import java.util.List; - @SuppressWarnings("unchecked") public class BoltExecutorStats extends CommonStats { - - MultiCountStatAndMetric ackedStats; - MultiCountStatAndMetric failedStats; MultiCountStatAndMetric executedStats; MultiLatencyStatAndMetric processLatencyStats; MultiLatencyStatAndMetric executeLatencyStats; public BoltExecutorStats(int rate,int numStatBuckets) { super(rate,numStatBuckets); - this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); - this.failedStats = new MultiCountStatAndMetric(numStatBuckets); this.executedStats = new MultiCountStatAndMetric(numStatBuckets); this.processLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); this.executeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); } - public MultiCountStatAndMetric getAcked() { - return ackedStats; - } - - public MultiCountStatAndMetric getFailed() { - return failedStats; - } - public MultiCountStatAndMetric getExecuted() { return executedStats; } @@ -67,8 +54,6 @@ public class BoltExecutorStats extends CommonStats { @Override public void cleanupStats() { - ackedStats.close(); - failedStats.close(); executedStats.close(); processLatencyStats.close(); executeLatencyStats.close(); @@ -81,16 +66,17 @@ public class BoltExecutorStats extends CommonStats { this.getExecuteLatencies().record(key, latencyMs); } - public void boltAckedTuple(String component, String stream, long latencyMs) { + public void boltAckedTuple(String component, String stream, long latencyMs, Counter ackedCounter) { List key = Lists.newArrayList(component, stream); this.getAcked().incBy(key, this.rate); + ackedCounter.inc(this.rate); this.getProcessLatencies().record(key, latencyMs); } - public void boltFailedTuple(String component, String stream, long latencyMs) { + public void boltFailedTuple(String component, String stream, long latencyMs, Counter failedCounter) { List key = Lists.newArrayList(component, stream); this.getFailed().incBy(key, this.rate); - + failedCounter.inc(this.rate); } @Override @@ -103,8 +89,8 @@ public class BoltExecutorStats extends CommonStats { // bolt stats BoltStats boltStats = new BoltStats( - StatsUtil.windowSetConverter(valueStat(ackedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(failedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(getAcked()), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(getFailed()), StatsUtil.TO_GSID, StatsUtil.IDENTITY), StatsUtil.windowSetConverter(valueStat(processLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), StatsUtil.windowSetConverter(valueStat(executedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), StatsUtil.windowSetConverter(valueStat(executeLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java index b6461c5..4c95da0 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java @@ -15,8 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.stats; +import com.codahale.metrics.Counter; import java.util.Map; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; @@ -24,9 +26,10 @@ import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @SuppressWarnings("unchecked") public abstract class CommonStats { - private final MultiCountStatAndMetric emittedStats; private final MultiCountStatAndMetric transferredStats; + private final MultiCountStatAndMetric ackedStats; + private final MultiCountStatAndMetric failedStats; protected final int rate; @@ -34,6 +37,16 @@ public abstract class CommonStats { this.rate = rate; this.emittedStats = new MultiCountStatAndMetric(numStatBuckets); this.transferredStats = new MultiCountStatAndMetric(numStatBuckets); + this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); + this.failedStats = new MultiCountStatAndMetric(numStatBuckets); + } + + public MultiCountStatAndMetric getFailed() { + return failedStats; + } + + public MultiCountStatAndMetric getAcked() { + return ackedStats; } public int getRate() { @@ -48,17 +61,21 @@ public abstract class CommonStats { return transferredStats; } - public void emittedTuple(String stream) { + public void emittedTuple(String stream, Counter emittedCounter) { this.getEmitted().incBy(stream, this.rate); + emittedCounter.inc(this.rate); } - public void transferredTuples(String stream, int amount) { + public void transferredTuples(String stream, int amount, Counter transferredCounter) { this.getTransferred().incBy(stream, this.rate * amount); + transferredCounter.inc(amount); } public void cleanupStats() { emittedStats.close(); transferredStats.close(); + ackedStats.close(); + failedStats.close(); } protected Map<String,Map<String,Long>> valueStat(MultiCountStatAndMetric metric) { http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 6c3d589..2fc1502 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -15,55 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.stats; +import com.codahale.metrics.Counter; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.generated.SpoutStats; -import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @SuppressWarnings("unchecked") public class SpoutExecutorStats extends CommonStats { - - private final MultiCountStatAndMetric ackedStats; - private final MultiCountStatAndMetric failedStats; private final MultiLatencyStatAndMetric completeLatencyStats; public SpoutExecutorStats(int rate,int numStatBuckets) { super(rate,numStatBuckets); - this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); - this.failedStats = new MultiCountStatAndMetric(numStatBuckets); this.completeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); } - public MultiCountStatAndMetric getAcked() { - return ackedStats; - } - - public MultiCountStatAndMetric getFailed() { - return failedStats; - } - public MultiLatencyStatAndMetric getCompleteLatencies() { return completeLatencyStats; } @Override public void cleanupStats() { - ackedStats.close(); - failedStats.close(); completeLatencyStats.close(); super.cleanupStats(); } - public void spoutAckedTuple(String stream, long latencyMs) { + public void spoutAckedTuple(String stream, long latencyMs, Counter ackedCounter) { this.getAcked().incBy(stream, this.rate); + ackedCounter.inc(this.rate); this.getCompleteLatencies().record(stream, latencyMs); } - public void spoutFailedTuple(String stream, long latencyMs) { + public void spoutFailedTuple(String stream, long latencyMs, Counter failedCounter) { this.getFailed().incBy(stream, this.rate); + failedCounter.inc(this.rate); } @Override @@ -76,7 +64,7 @@ public class SpoutExecutorStats extends CommonStats { // spout stats SpoutStats spoutStats = new SpoutStats( - valueStat(ackedStats), valueStat(failedStats), valueStat(completeLatencyStats)); + valueStat(getAcked()), valueStat(getFailed()), valueStat(completeLatencyStats)); ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java index 0a1765b..5985684 100644 --- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java +++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java @@ -15,30 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.task; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.NotImplementedException; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metric.api.ICombiner; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IReducer; -import org.apache.storm.metric.api.ICombiner; import org.apache.storm.metric.api.ReducedMetric; -import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.state.ISubscribedState; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang.NotImplementedException; import org.json.simple.JSONValue; /** @@ -168,9 +173,9 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } /** - * Gets the declared output fields for the specified stream id for the - * component this task is a part of. - */ + * Gets the declared output fields for the specified stream id for the + * component this task is a part of. + */ public Fields getThisOutputFields(String streamId) { return getComponentOutputFields(getThisComponentId(), streamId); } @@ -202,8 +207,8 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo public int getThisTaskIndex() { List<Integer> tasks = new ArrayList<>(getComponentTasks(getThisComponentId())); Collections.sort(tasks); - for(int i=0; i<tasks.size(); i++) { - if(tasks.get(i) == getThisTaskId()) { + for (int i=0; i<tasks.size(); i++) { + if (tasks.get(i) == getThisTaskId()) { return i; } } @@ -329,6 +334,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo * You must call this during `IBolt.prepare()` or `ISpout.open()`. * @return The IMetric argument unchanged. */ + @Deprecated public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) { if(_openOrPrepareWasCalled.get()) { throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + @@ -349,18 +355,18 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo } Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics; - if(!m1.containsKey(timeBucketSizeInSecs)) { + if (!m1.containsKey(timeBucketSizeInSecs)) { m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>()); } Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs); - if(!m2.containsKey(_taskId)) { + if (!m2.containsKey(_taskId)) { m2.put(_taskId, new HashMap<String, IMetric>()); } Map<String, IMetric> m3 = m2.get(_taskId); - if(m3.containsKey(name)) { - throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); + if (m3.containsKey(name)) { + throw new RuntimeException("The same metric name `" + name + "` was registered twice."); } else { m3.put(name, metric); } @@ -375,6 +381,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo * cause the same metric name can register twice. * So we just return the first metric we meet. */ + @Deprecated public IMetric getRegisteredMetricByName(String name) { IMetric metric = null; @@ -395,13 +402,40 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo /* * Convenience method for registering ReducedMetric. */ + @Deprecated public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } + /* * Convenience method for registering CombinedMetric. */ + @Deprecated public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + + public Timer registerTimer(String name) { + return StormMetricRegistry.registry().timer(metricName(name)); + } + + public Histogram registerHistogram(String name) { + return StormMetricRegistry.registry().histogram(metricName(name)); + } + + public Meter registerMeter(String name) { + return StormMetricRegistry.registry().meter(metricName(name)); + } + + public Counter registerCounter(String name) { + return StormMetricRegistry.registry().counter(metricName(name)); + } + + public Gauge registerGauge(String name, Gauge gauge) { + return StormMetricRegistry.registry().register(metricName(name), gauge); + } + + private String metricName(String name) { + return StormMetricRegistry.metricName(name, this); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index 7a64812..3212df0 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -18,27 +18,34 @@ package org.apache.storm.utils; -import org.apache.storm.policy.IWaitStrategy; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.internal.RateTracker; +import org.apache.storm.metrics2.JcMetrics; +import org.apache.storm.metrics2.StormMetricRegistry; +import org.apache.storm.policy.IWaitStrategy; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscArrayQueue; import org.jctools.queues.MpscUnboundedArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - - public class JCQueue implements IStatefulObject { private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class); + private static final String PREFIX = "jc-"; + private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(PREFIX + "metrics-reporter").build()); public static final Object INTERRUPT = new Object(); private final ExitCondition continueRunning = () -> true; + private final JcMetrics jcMetrics; private interface Inserter { // blocking call that can be interrupted using Thread.interrupt() @@ -64,7 +71,7 @@ public class JCQueue implements IStatefulObject { int idleCount = 0; while (!inserted) { q.metrics.notifyInsertFailure(); - if (idleCount==0) { // check avoids multiple log msgs when in a idle loop + if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName()); } @@ -90,7 +97,6 @@ public class JCQueue implements IStatefulObject { @Override public void flush() throws InterruptedException { - return; } @Override @@ -111,7 +117,7 @@ public class JCQueue implements IStatefulObject { this.currentBatch = new ArrayList<>(batchSz + 1); } - /** Blocking call - retires till element is successfully added */ + /** Blocking call - retires till element is successfully added. */ @Override public void publish(Object obj) throws InterruptedException { currentBatch.add(obj); @@ -143,7 +149,7 @@ public class JCQueue implements IStatefulObject { int retryCount = 0; while (publishCount == 0) { // retry till at least 1 element is drained q.metrics.notifyInsertFailure(); - if (retryCount==0) { // check avoids multiple log msgs when in a idle loop + if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", q.getName()); } retryCount = q.backPressureWaitStrategy.idle(retryCount); @@ -236,7 +242,10 @@ public class JCQueue implements IStatefulObject { } private final MpscArrayQueue<Object> recvQueue; - private final MpscUnboundedArrayQueue<Object> overflowQ; // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full + + // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full + private final MpscUnboundedArrayQueue<Object> overflowQ; + private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow. @@ -250,17 +259,28 @@ public class JCQueue implements IStatefulObject { private String queueName; private final IWaitStrategy backPressureWaitStrategy; - public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy) { + public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy, + String topologyId, String componentId, Integer taskId, int port) { this.queueName = queueName; this.overflowLimit = overflowLimit; this.recvQueue = new MpscArrayQueue<>(size); this.overflowQ = new MpscUnboundedArrayQueue<>(size); this.metrics = new JCQueue.QueueMetrics(); + this.jcMetrics = StormMetricRegistry.jcMetrics(queueName, topologyId, componentId, taskId, port); //The batch size can be no larger than half the full recvQueue size, to avoid contention issues. this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2)); this.backPressureWaitStrategy = backPressureWaitStrategy; + + if (!METRICS_REPORTER_EXECUTOR.isShutdown()) { + METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + jcMetrics.set(metrics); + } + }, 15, 15, TimeUnit.SECONDS); + } } public String getName() { @@ -271,6 +291,7 @@ public class JCQueue implements IStatefulObject { public boolean haltWithInterrupt() { boolean res = tryPublishInternal(INTERRUPT); metrics.close(); + METRICS_REPORTER_EXECUTOR.shutdown(); return res; } @@ -294,15 +315,18 @@ public class JCQueue implements IStatefulObject { } } - public int size() { return recvQueue.size() + overflowQ.size(); } + public int size() { + return recvQueue.size() + overflowQ.size(); + } /** * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q + * @param consumer * @param exitCond */ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException { int drainCount = 0; - while ( exitCond.keepRunning() ) { + while (exitCond.keepRunning()) { Object tuple = recvQueue.poll(); if (tuple == null) { break; @@ -374,7 +398,7 @@ public class JCQueue implements IStatefulObject { } /** - * Non-blocking call, returns false if full + * Non-blocking call, returns false if full. **/ public boolean tryPublish(Object obj) { Inserter inserter = getInserter(); @@ -391,7 +415,7 @@ public class JCQueue implements IStatefulObject { * returns false if overflowLimit has reached */ public boolean tryPublishToOverflow(Object obj) { - if (overflowLimit>0 && overflowQ.size() >= overflowLimit) { + if (overflowLimit > 0 && overflowQ.size() >= overflowLimit) { return false; } overflowQ.add(obj); http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java index 34adbc4..cb99870 100644 --- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -46,9 +46,11 @@ import org.slf4j.LoggerFactory; public class ConfigValidation { private static final Logger LOG = LoggerFactory.getLogger(ConfigValidation.class); - public static abstract class Validator { + public abstract static class Validator { public Validator(Map<String, Object> params) {} + public Validator() {} + public abstract void validateField(String name, Object o); } @@ -57,7 +59,7 @@ public class ConfigValidation { */ /** - * Validates if an object is not null + * Validates if an object is not null. */ public static class NotNullValidator extends Validator { @@ -71,7 +73,7 @@ public class ConfigValidation { } /** - * Validates basic types + * Validates basic types. */ public static class SimpleTypeValidator extends Validator { @@ -93,7 +95,8 @@ public class ConfigValidation { if (type.isInstance(o)) { return; } - throw new IllegalArgumentException("Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass()); + throw new IllegalArgumentException( + "Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass()); } } @@ -137,9 +140,10 @@ public class ConfigValidation { public StringValidator(Map<String, Object> params) { - this.acceptedValues = new HashSet<String>(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES))); + this.acceptedValues = + new HashSet<String>(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES))); - if(this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) { + if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) { this.acceptedValues = null; } } @@ -147,7 +151,7 @@ public class ConfigValidation { @Override public void validateField(String name, Object o) { SimpleTypeValidator.validateField(name, String.class, o); - if(this.acceptedValues != null) { + if (this.acceptedValues != null) { if (!this.acceptedValues.contains((String) o)) { throw new IllegalArgumentException("Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues); } @@ -194,8 +198,8 @@ public class ConfigValidation { return; } final long i; - if (o instanceof Number && - (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { + if (o instanceof Number + && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) { return; } @@ -205,7 +209,7 @@ public class ConfigValidation { } /** - * Validates an entry for ImpersonationAclUser + * Validates an entry for ImpersonationAclUser. */ public static class ImpersonationAclUserEntryValidator extends Validator { @@ -214,7 +218,8 @@ public class ConfigValidation { if (o == null) { return; } - ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false), + ConfigValidationUtils.NestableFieldValidator validator = + ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false), ConfigValidationUtils.listFv(String.class, false), false); validator.validateField(name, o); @SuppressWarnings("unchecked") @@ -229,7 +234,7 @@ public class ConfigValidation { } /** - * validates a list of has no duplicates + * validates a list of has no duplicates. */ public static class NoDuplicateInListValidator extends Validator { @@ -251,7 +256,7 @@ public class ConfigValidation { } /** - * Validates a String or a list of Strings + * Validates a String or a list of Strings. */ public static class StringOrStringListValidator extends Validator { @@ -269,7 +274,7 @@ public class ConfigValidation { } /** - * Validates Kryo Registration + * Validates Kryo Registration. */ public static class KryoRegValidator extends Validator { @@ -283,8 +288,8 @@ public class ConfigValidation { for (Object e : (Iterable<?>) o) { if (e instanceof Map) { for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) e).entrySet()) { - if (!(entry.getKey() instanceof String) || - !(entry.getValue() instanceof String)) { + if (!(entry.getKey() instanceof String) + || !(entry.getValue() instanceof String)) { throw new IllegalArgumentException( "Each element of the list " + name + " must be a String or a Map of Strings"); } @@ -302,7 +307,7 @@ public class ConfigValidation { } /** - * Validates if a number is a power of 2 + * Validates if a number is a power of 2. */ public static class PowerOf2Validator extends Validator { @@ -312,8 +317,8 @@ public class ConfigValidation { return; } final long i; - if (o instanceof Number && - (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { + if (o instanceof Number + && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { // Test whether the integer is a power of 2. if (i > 0 && (i & (i - 1)) == 0) { return; @@ -324,7 +329,7 @@ public class ConfigValidation { } /** - * Validates each entry in a list + * Validates each entry in a list. */ public static class ListEntryTypeValidator extends Validator { @@ -346,10 +351,10 @@ public class ConfigValidation { } /** - * Validates each entry in a list against a list of custom Validators + * Validates each entry in a list against a list of custom Validators. * Each validator in the list of validators must inherit or be an instance of Validator class */ - public static class ListEntryCustomValidator extends Validator{ + public static class ListEntryCustomValidator extends Validator { private Class<?>[] entryValidators; @@ -366,7 +371,8 @@ public class ConfigValidation { } } - public static void validateField(String name, Class<?>[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { + public static void validateField(String name, Class<?>[] validators, Object o) + throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { if (o == null) { return; } @@ -386,9 +392,9 @@ public class ConfigValidation { } /** - * validates each key and value in a map of a certain type + * validates each key and value in a map of a certain type. */ - public static class MapEntryTypeValidator extends Validator{ + public static class MapEntryTypeValidator extends Validator { private Class<?> keyType; private Class<?> valueType; @@ -410,9 +416,9 @@ public class ConfigValidation { } /** - * validates each key and each value against the respective arrays of validators + * validates each key and each value against the respective arrays of validators. */ - public static class MapEntryCustomValidator extends Validator{ + public static class MapEntryCustomValidator extends Validator { private Class<?>[] keyValidators; private Class<?>[] valueValidators; @@ -432,7 +438,8 @@ public class ConfigValidation { } @SuppressWarnings("unchecked") - public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { + public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o) + throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { if (o == null) { return; } @@ -460,9 +467,9 @@ public class ConfigValidation { } /** - * Validates a positive number + * Validates a positive number. */ - public static class PositiveNumberValidator extends Validator{ + public static class PositiveNumberValidator extends Validator { private boolean includeZero; @@ -484,7 +491,7 @@ public class ConfigValidation { return; } if (o instanceof Number) { - if(includeZero) { + if (includeZero) { if (((Number) o).doubleValue() >= 0.0) { return; } @@ -518,14 +525,14 @@ public class ConfigValidation { @Override public void validateField(String name, Object o) { - if(o == null) { + if (o == null) { return; } SimpleTypeValidator.validateField(name, Map.class, o); - if(!((Map<?, ?>) o).containsKey("class") ) { - throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + if (!((Map<?, ?>) o).containsKey("class")) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); } - if(!((Map<?, ?>) o).containsKey("parallelism.hint") ) { + if (!((Map<?, ?>) o).containsKey("parallelism.hint")) { throw new IllegalArgumentException("Field " + name + " must have map entry with key: parallelism.hint"); } @@ -534,21 +541,65 @@ public class ConfigValidation { } } + public static class MetricReportersValidator extends Validator { + private static final String NIMBUS = "nimbus"; + private static final String SUPERVISOR = "supervisor"; + private static final String WORKER = "worker"; + private static final String CLASS = "class"; + private static final String FILTER = "filter"; + private static final String DAEMONS = "daemons"; + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + SimpleTypeValidator.validateField(name, Map.class, o); + if (!((Map) o).containsKey(CLASS)) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); + } + if (!((Map) o).containsKey(DAEMONS)) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); + } else { + // daemons can only be 'nimbus', 'supervisor', or 'worker' + Object list = ((Map)o).get(DAEMONS); + if (!(list instanceof List)) { + throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); + } + List daemonList = (List)list; + for (Object string : daemonList) { + if (string instanceof String + && (string.equals(NIMBUS) || string.equals(SUPERVISOR) || string.equals(WORKER))) { + continue; + } + throw new IllegalArgumentException("Field 'daemons' must contain at least one of the following:" + + " \"nimbus\", \"supervisor\", or \"worker\""); + } + + } + if (((Map)o).containsKey(FILTER)) { + Map filterMap = (Map)((Map)o).get(FILTER); + SimpleTypeValidator.validateField(CLASS, String.class, filterMap.get(CLASS)); + } + SimpleTypeValidator.validateField(name, String.class, ((Map) o).get(CLASS)); + } + } + public static class EventLoggerRegistryValidator extends Validator { @Override public void validateField(String name, Object o) { - if(o == null) { + if (o == null) { return; } SimpleTypeValidator.validateField(name, Map.class, o); - if(!((Map<?, ?>) o).containsKey("class") ) { - throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + if (!((Map<?, ?>) o).containsKey("class")) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); } SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) o).get("class")); - if(((Map<?, ?>) o).containsKey("arguments") ) { + if (((Map<?, ?>) o).containsKey("arguments")) { SimpleTypeValidator.validateField(name, Map.class, ((Map<?, ?>) o).get("arguments")); } } @@ -570,10 +621,9 @@ public class ConfigValidation { throw new IllegalArgumentException("Field " + name + " must be set."); } - if (o instanceof String && - (((String) o).equals("NONE") || - ((String) o).equals("DIGEST") || - ((String) o).equals("KERBEROS"))) { + if (o instanceof String + && (((String) o).equals("NONE") || ((String) o).equals("DIGEST") + || ((String) o).equals("KERBEROS"))) { return; } throw new IllegalArgumentException("Field " + name + " must be one of \"NONE\", \"DIGEST\", or \"KERBEROS\""); @@ -684,9 +734,9 @@ public class ConfigValidation { private static List<Class<?>> configClasses = null; //We follow the model of service loaders (Even though it is not a service). - private static final String CONFIG_CLASSES_NAME = "META-INF/services/"+Validated.class.getName(); + private static final String CONFIG_CLASSES_NAME = "META-INF/services/" + Validated.class.getName(); - private synchronized static List<Class<?>> getConfigClasses() { + private static synchronized List<Class<?>> getConfigClasses() { if (configClasses == null) { List<Class<?>> ret = new ArrayList<>(); Set<String> classesToScan = new HashSet<>(); @@ -695,7 +745,7 @@ public class ConfigValidation { try { try (BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()))) { String line; - while((line = in.readLine()) != null) { + while ((line = in.readLine()) != null) { line = line.replaceAll("#.*$", "").trim(); if (!line.isEmpty()) { classesToScan.add(line); @@ -720,7 +770,7 @@ public class ConfigValidation { } /** - * Validates a field given field name as string + * Validates a field given field name as string. * * @param fieldName provided as a string * @param conf map of confs @@ -783,9 +833,7 @@ public class ConfigValidation { //If validator has a constructor that takes a Map as an argument call that constructor if (hasConstructor(clazz, Map.class)) { o = clazz.getConstructor(Map.class).newInstance(params); - } - //If not call default constructor - else { + } else { //If not call default constructor o = clazz.newInstance(); } o.validateField(field.getName(), conf.get(key)); @@ -797,7 +845,7 @@ public class ConfigValidation { } /** - * Validate all confs in map + * Validate all confs in map. * * @param conf map of configs */ @@ -811,15 +859,15 @@ public class ConfigValidation { private static final int ACC_STATIC = 0x0008; private static final int ACC_FINAL = 0x0010; private static final int DESIRED_FIELD_ACC = ACC_PUBLIC | ACC_STATIC | ACC_FINAL; + public static boolean isFieldAllowed(Field field) { - return field.getAnnotation(NotConf.class) == null && - String.class.equals(field.getType()) && - ((field.getModifiers() & DESIRED_FIELD_ACC) == DESIRED_FIELD_ACC) && - !field.isSynthetic(); + return field.getAnnotation(NotConf.class) == null + && String.class.equals(field.getType()) + && ((field.getModifiers() & DESIRED_FIELD_ACC) == DESIRED_FIELD_ACC) && !field.isSynthetic(); } /** - * Validate all confs in map + * Validate all confs in map. * * @param conf map of configs * @param classes config class @@ -848,9 +896,10 @@ public class ConfigValidation { } } - private static Map<String,Object> getParamsFromAnnotation(Class<?> validatorClass, Object v) throws InvocationTargetException, IllegalAccessException { + private static Map<String,Object> getParamsFromAnnotation(Class<?> validatorClass, Object v) + throws InvocationTargetException, IllegalAccessException { Map<String, Object> params = new HashMap<String, Object>(); - for(Method method : validatorClass.getDeclaredMethods()) { + for (Method method : validatorClass.getDeclaredMethods()) { Object value = null; try { @@ -858,7 +907,7 @@ public class ConfigValidation { } catch (IllegalArgumentException ex) { value = null; } - if(value != null) { + if (value != null) { params.put(method.getName(), value); } } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java index 29ba179..e48d090 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java @@ -46,7 +46,7 @@ public class JCQueueBackpressureTest extends TestCase { } private static JCQueue createQueue(String name, int queueSize) { - return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0)); + return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test",1000, 1000); } private static class TestConsumer implements Consumer { http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java index bdc4937..3d48de2 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java @@ -200,10 +200,10 @@ public class JCQueueTest { } private JCQueue createQueue(String name, int queueSize) { - return new JCQueue(name, queueSize, 0, 1, waitStrategy); + return createQueue(name, 1, queueSize); } private JCQueue createQueue(String name, int batchSize, int queueSize) { - return new JCQueue(name, queueSize, 0, batchSize, waitStrategy); + return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test",1000, 1000); } } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index ec1815d..c6cff6b 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -18,19 +18,22 @@ package org.apache.storm; +import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger; +import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom; import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber; import static org.apache.storm.validation.ConfigValidationAnnotations.isString; import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList; import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList; -import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull; -import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom; -import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber; import static org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass; import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType; import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList; import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom; +import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull; + +import java.util.ArrayList; +import java.util.Map; import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; @@ -42,9 +45,6 @@ import org.apache.storm.security.auth.IAuthorizer; import org.apache.storm.validation.ConfigValidation; import org.apache.storm.validation.Validated; -import java.util.ArrayList; -import java.util.Map; - /** * Storm configs are specified as a plain old map. This class provides constants for * all the configurations possible on a Storm cluster. Each constant is paired with an annotation @@ -69,32 +69,12 @@ public class DaemonConfig implements Validated { public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins"; /** - * A specify Locale for daemon metrics reporter plugin. - * Use the specified IETF BCP 47 language tag string for a Locale. - */ - @isString - public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.locale"; - - /** * A specify domain for daemon metrics reporter plugin to limit reporting to specific domain. */ @isString public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain"; /** - * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin. - */ - @isString - public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit"; - - /** - * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin. - */ - @isString - public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit"; - - - /** * A specify csv reporter directory for CvsPreparableReporter daemon metrics reporter. */ @isString @@ -303,7 +283,7 @@ public class DaemonConfig implements Validated { public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE = "nimbus.assignments.service.thread.queue.size"; /** - * class controls heartbeats recovery strategy + * class controls heartbeats recovery strategy. */ @isString public static final String NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = "nimbus.worker.heartbeats.recovery.strategy.class"; @@ -829,7 +809,8 @@ public class DaemonConfig implements Validated { */ @NotNull @isPositiveNumber - public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs"; + public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = + "storm.cluster.metrics.consumer.publish.interval.secs"; /** * Enables user-first classpath. See topology.classpath.beginning. @@ -863,8 +844,8 @@ public class DaemonConfig implements Validated { /** * For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory. - * If it is a directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of - * the real URI to use ArtifactoryConfigLoader. + * If it is a directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" + * to the beginning of the real URI to use ArtifactoryConfigLoader. * For FileConfigLoader, this is the URI pointing to a file. */ @isString http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java index b5bd8bc..85ee27f 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java @@ -15,8 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; @@ -26,15 +32,8 @@ import org.apache.storm.utils.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; - public class MetricsUtils { - private final static Logger LOG = LoggerFactory.getLogger(MetricsUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsUtils.class); public static List<PreparableReporter> getPreparableReporters(Map<String, Object> topoConf) { List<String> clazzes = (List<String>) topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGINS); @@ -60,30 +59,6 @@ public class MetricsUtils { return reporter; } - public static Locale getMetricsReporterLocale(Map<String, Object> topoConf) { - String languageTag = ObjectReader.getString(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null); - if (languageTag != null) { - return Locale.forLanguageTag(languageTag); - } - return null; - } - - public static TimeUnit getMetricsRateUnit(Map<String, Object> topoConf) { - return getTimeUnitForCofig(topoConf, DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT); - } - - public static TimeUnit getMetricsDurationUnit(Map<String, Object> topoConf) { - return getTimeUnitForCofig(topoConf, DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT); - } - - private static TimeUnit getTimeUnitForCofig(Map<String, Object> topoConf, String configName) { - String rateUnitString = ObjectReader.getString(topoConf.get(configName), null); - if (rateUnitString != null) { - return TimeUnit.valueOf(rateUnitString); - } - return null; - } - public static File getCsvLogDir(Map<String, Object> topoConf) { String csvMetricsLogDirectory = ObjectReader.getString(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR), null); if (csvMetricsLogDirectory == null) { http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java index bc34b4b..cd6d069 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java @@ -15,20 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> { - private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); ConsoleReporter reporter = null; @Override @@ -37,17 +37,17 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry); builder.outputTo(System.out); - Locale locale = MetricsUtils.getMetricsReporterLocale(topoConf); + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(topoConf); if (locale != null) { builder.formattedFor(locale); } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf); + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } - TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(topoConf); + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(topoConf); if (durationUnit != null) { builder.convertDurationsTo(durationUnit); } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java index b67cc65..3ab6a99 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java @@ -15,21 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.CsvReporter; import com.codahale.metrics.MetricRegistry; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CsvPreparableReporter implements PreparableReporter<CsvReporter> { - private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class); CsvReporter reporter = null; @Override @@ -37,17 +38,17 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> { LOG.debug("Preparing..."); CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); - Locale locale = MetricsUtils.getMetricsReporterLocale(topoConf); + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(topoConf); if (locale != null) { builder.formatFor(locale); } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf); + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } - TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(topoConf); + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(topoConf); if (durationUnit != null) { builder.convertDurationsTo(durationUnit); } http://git-wip-us.apache.org/repos/asf/storm/blob/21da5dff/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java index 21aab16..c369879 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java @@ -15,21 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.storm.DaemonConfig; -import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; import org.apache.storm.utils.ObjectReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.TimeUnit; - public class JmxPreparableReporter implements PreparableReporter<JmxReporter> { - private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class); JmxReporter reporter = null; @Override @@ -40,7 +40,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> { if (domain != null) { builder.inDomain(domain); } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf); + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); }