Repository: flink Updated Branches: refs/heads/master c0199f5d1 -> 56017a98f
[FLINK-7502][metrics] Improve PrometheusReporter * Do not throw exception when same metric is added twice * Add possibility to configure port range * Bump prometheus.version 0.0.21 -> 0.0.26 * Use simpleclient_httpserver instead of nanohttpd * guard gauge report against null * guard close() vs NPE This closes #4586. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56017a98 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56017a98 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56017a98 Branch: refs/heads/master Commit: 56017a98fa61fdfae1c8dadd90a378ffdb3fea72 Parents: c0199f5 Author: Maximilian Bode <[email protected]> Authored: Thu Aug 24 15:59:24 2017 +0200 Committer: zentol <[email protected]> Committed: Thu Oct 26 09:48:21 2017 +0200 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 6 +- flink-metrics/flink-metrics-prometheus/pom.xml | 19 +- .../metrics/prometheus/PrometheusReporter.java | 229 +++++++++++-------- .../PrometheusReporterTaskScopeTest.java | 188 +++++++++++++++ .../prometheus/PrometheusReporterTest.java | 185 +++++++++++---- pom.xml | 2 +- 6 files changed, 477 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index e191101..64d7318 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -424,7 +424,7 @@ of your Flink distribution. Parameters: -- `port` - (optional) the port the Prometheus exporter listens on, defaults to [9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations). +- `port` - (optional) the port the Prometheus exporter listens on, defaults to [9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations). In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like `9250-9260`. Example configuration: @@ -440,11 +440,11 @@ Flink metric types are mapped to Prometheus metric types as follows: | Flink | Prometheus | Note | | --------- |------------|------------------------------------------| | Counter | Gauge |Prometheus counters cannot be decremented.| -| Gauge | Gauge | | +| Gauge | Gauge |Only numbers and booleans are supported. | | Histogram | Summary |Quantiles .5, .75, .95, .98, .99 and .999 | | Meter | Gauge |The gauge exports the meter's rate. | -All Flink metrics variables, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>` and `<operator_name>`, are exported to Prometheus as labels. +All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter) http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml index f8f2eea..0e9b261 100644 --- a/flink-metrics/flink-metrics-prometheus/pom.xml +++ b/flink-metrics/flink-metrics-prometheus/pom.xml @@ -42,6 +42,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> @@ -62,16 +69,10 @@ under the License. <dependency> <groupId>io.prometheus</groupId> - <artifactId>simpleclient_servlet</artifactId> + <artifactId>simpleclient_httpserver</artifactId> <version>${prometheus.version}</version> </dependency> - <dependency> - <groupId>org.nanohttpd</groupId> - <artifactId>nanohttpd</artifactId> - <version>2.2.0</version> - </dependency> - <!-- test dependencies --> <dependency> @@ -114,10 +115,6 @@ under the License. <pattern>io.prometheus.client</pattern> <shadedPattern>org.apache.flink.shaded.io.prometheus.client</shadedPattern> </relocation> - <relocation> - <pattern>fi.iki.elonen</pattern> - <shadedPattern>org.apache.flink.shaded.fi.iki.elonen</shadedPattern> - </relocation> </relocations> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java index d23be8c..1e44ab9 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java @@ -24,7 +24,6 @@ import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; @@ -32,20 +31,21 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; +import org.apache.flink.util.NetUtils; -import fi.iki.elonen.NanoHTTPD; import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.exporter.common.TextFormat; +import io.prometheus.client.exporter.HTTPServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.StringWriter; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -59,7 +59,7 @@ public class PrometheusReporter implements MetricReporter { private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class); static final String ARG_PORT = "port"; - private static final int DEFAULT_PORT = 9249; + private static final String DEFAULT_PORT = "9249"; private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]"); private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { @@ -72,8 +72,8 @@ public class PrometheusReporter implements MetricReporter { private static final char SCOPE_SEPARATOR = '_'; private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; - private PrometheusEndpoint prometheusEndpoint; - private final Map<String, Collector> collectorsByMetricName = new HashMap<>(); + private HTTPServer httpServer; + private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>> collectorsWithCountByMetricName = new HashMap<>(); @VisibleForTesting static String replaceInvalidChars(final String input) { @@ -84,27 +84,34 @@ public class PrometheusReporter implements MetricReporter { @Override public void open(MetricConfig config) { - int port = config.getInteger(ARG_PORT, DEFAULT_PORT); - LOG.info("Using port {}.", port); - prometheusEndpoint = new PrometheusEndpoint(port); - try { - prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true); - } catch (IOException e) { - final String msg = "Could not start PrometheusEndpoint on port " + port; - LOG.warn(msg, e); - throw new RuntimeException(msg, e); + String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT); + Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig); + + while (ports.hasNext()) { + int port = ports.next(); + try { + httpServer = new HTTPServer(port); + LOG.info("Started PrometheusReporter HTTP server on port {}.", port); + break; + } catch (IOException ioe) { //assume port conflict + LOG.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe); + } + } + if (httpServer == null) { + throw new RuntimeException("Could not start PrometheusReporter HTTP server on any configured port. Ports: " + portsConfig); } } @Override public void close() { - prometheusEndpoint.stop(); + if (httpServer != null) { + httpServer.stop(); + } CollectorRegistry.defaultRegistry.clear(); } @Override public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) { - final String scope = SCOPE_PREFIX + getLogicalScope(group); List<String> dimensionKeys = new LinkedList<>(); List<String> dimensionValues = new LinkedList<>(); @@ -114,30 +121,86 @@ public class PrometheusReporter implements MetricReporter { dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); } - final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); - final String metricIdentifier = group.getMetricIdentifier(metricName); + final String scopedMetricName = getScopedName(metricName, group); + final String helpString = metricName + " (scope: " + getLogicalScope(group) + ")"; + final Collector collector; + Integer count = 0; + + synchronized (this) { + if (collectorsWithCountByMetricName.containsKey(scopedMetricName)) { + final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + collector = collectorWithCount.getKey(); + count = collectorWithCount.getValue(); + } else { + collector = createCollector(metric, dimensionKeys, dimensionValues, scopedMetricName, helpString); + try { + collector.register(); + } catch (Exception e) { + LOG.warn("There was a problem registering metric {}.", metricName, e); + } + } + addMetric(metric, dimensionValues, collector); + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count + 1)); + } + } + + private static String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName); + } + + private static Collector createCollector(Metric metric, List<String> dimensionKeys, List<String> dimensionValues, String scopedMetricName, String helpString) { + Collector collector; + if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) { + collector = io.prometheus.client.Gauge + .build() + .name(scopedMetricName) + .help(helpString) + .labelNames(toArray(dimensionKeys)) + .create(); + } else if (metric instanceof Histogram) { + collector = new HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString, dimensionKeys, dimensionValues); + } else { + LOG.warn("Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + collector = null; + } + return collector; + } + + private static void addMetric(Metric metric, List<String> dimensionValues, Collector collector) { if (metric instanceof Gauge) { - collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); } else if (metric instanceof Counter) { - collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); } else if (metric instanceof Meter) { - collector = createGauge((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); + ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); } else if (metric instanceof Histogram) { - collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues); + ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); } else { LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", metric.getClass().getName()); - return; } - collector.register(); - collectorsByMetricName.put(metricName, collector); } @Override public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { - CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName)); - collectorsByMetricName.remove(metricName); + final String scopedMetricName = getScopedName(metricName, group); + synchronized (this) { + final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); + final Integer count = collectorWithCount.getValue(); + final Collector collector = collectorWithCount.getKey(); + if (count == 1) { + try { + CollectorRegistry.defaultRegistry.unregister(collector); + } catch (Exception e) { + LOG.warn("There was a problem unregistering metric {}.", scopedMetricName, e); + } + collectorsWithCountByMetricName.remove(scopedMetricName); + } else { + collectorsWithCountByMetricName.put(scopedMetricName, new AbstractMap.SimpleImmutableEntry<>(collector, count - 1)); + } + } } @SuppressWarnings("unchecked") @@ -145,97 +208,65 @@ public class PrometheusReporter implements MetricReporter { return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); } - private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) { - return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() { + @VisibleForTesting + static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) { + return new io.prometheus.client.Gauge.Child() { @Override public double get() { final Object value = gauge.getValue(); + if (value == null) { + LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge); + return 0; + } if (value instanceof Double) { return (double) value; } if (value instanceof Number) { return ((Number) value).doubleValue(); - } else if (value instanceof Boolean) { + } + if (value instanceof Boolean) { return ((Boolean) value) ? 1 : 0; - } else { - LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", - gauge, value.getClass().getName()); - return 0; } + LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.", + gauge, value.getClass().getName()); + return 0; } - }); + }; } - private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) { - return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() { + private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) { + return new io.prometheus.client.Gauge.Child() { @Override public double get() { return (double) counter.getCount(); } - }); + }; } - private Collector createGauge(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) { - return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() { + private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) { + return new io.prometheus.client.Gauge.Child() { @Override public double get() { return meter.getRate(); } - }); - } - - private static Collector newGauge(String name, String identifier, List<String> labelNames, List<String> labelValues, io.prometheus.client.Gauge.Child child) { - return io.prometheus.client.Gauge - .build() - .name(name) - .help(identifier) - .labelNames(toArray(labelNames)) - .create() - .setChild(child, toArray(labelValues)); - } - - private static HistogramSummaryProxy createSummary(final Histogram histogram, final String name, final String identifier, final List<String> dimensionKeys, final List<String> dimensionValues) { - return new HistogramSummaryProxy(histogram, name, identifier, dimensionKeys, dimensionValues); - } - - static class PrometheusEndpoint extends NanoHTTPD { - static final String MIME_TYPE = "plain/text"; - - PrometheusEndpoint(int port) { - super(port); - } - - @Override - public Response serve(IHTTPSession session) { - if (session.getUri().equals("/metrics")) { - StringWriter writer = new StringWriter(); - try { - TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples()); - } catch (IOException e) { - return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, MIME_TYPE, "Unable to output metrics"); - } - return newFixedLengthResponse(Response.Status.OK, TextFormat.CONTENT_TYPE_004, writer.toString()); - } else { - return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_TYPE, "Not found"); - } - } + }; } - private static class HistogramSummaryProxy extends Collector { - private static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); + @VisibleForTesting + static class HistogramSummaryProxy extends Collector { + static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999); - private final Histogram histogram; private final String metricName; - private final String metricIdentifier; + private final String helpString; private final List<String> labelNamesWithQuantile; - private final List<String> labelValues; - HistogramSummaryProxy(final Histogram histogram, final String metricName, final String metricIdentifier, final List<String> labelNames, final List<String> labelValues) { - this.histogram = histogram; + private final Map<List<String>, Histogram> histogramsByLabelValues = new HashMap<>(); + + HistogramSummaryProxy(final Histogram histogram, final String metricName, final String helpString, final List<String> labelNames, final List<String> labelValues) { this.metricName = metricName; - this.metricIdentifier = metricIdentifier; + this.helpString = helpString; this.labelNamesWithQuantile = addToList(labelNames, "quantile"); - this.labelValues = labelValues; + histogramsByLabelValues.put(labelValues, histogram); } @Override @@ -243,17 +274,25 @@ public class PrometheusReporter implements MetricReporter { // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms, // whose snapshot's values array only holds a sample of recent values). - final HistogramStatistics statistics = histogram.getStatistics(); - List<MetricFamilySamples.Sample> samples = new LinkedList<>(); + for (Map.Entry<List<String>, Histogram> labelValuesToHistogram : histogramsByLabelValues.entrySet()) { + addSamples(labelValuesToHistogram.getKey(), labelValuesToHistogram.getValue(), samples); + } + return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples)); + } + + void addChild(final Histogram histogram, final List<String> labelValues) { + histogramsByLabelValues.put(labelValues, histogram); + } + + private void addSamples(final List<String> labelValues, final Histogram histogram, final List<MetricFamilySamples.Sample> samples) { samples.add(new MetricFamilySamples.Sample(metricName + "_count", labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount())); for (final Double quantile : QUANTILES) { samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile, addToList(labelValues, quantile.toString()), - statistics.getQuantile(quantile))); + histogram.getStatistics().getQuantile(quantile))); } - return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, metricIdentifier, samples)); } } @@ -263,7 +302,7 @@ public class PrometheusReporter implements MetricReporter { return result; } - private static String[] toArray(List<String> labelNames) { - return labelNames.toArray(new String[labelNames.size()]); + private static String[] toArray(List<String> list) { + return list.toArray(new String[list.size()]); } } http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java new file mode 100644 index 0000000..c7d4040 --- /dev/null +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.util.TestMeter; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.util.TestingHistogram; +import org.apache.flink.util.AbstractID; + +import com.mashape.unirest.http.exceptions.UnirestException; +import io.prometheus.client.CollectorRegistry; +import org.junit.After; +import org.junit.Test; + +import java.util.Arrays; + +import static org.apache.flink.metrics.prometheus.PrometheusReporterTest.createConfigWithOneReporter; +import static org.apache.flink.metrics.prometheus.PrometheusReporterTest.pollMetrics; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +/** + * Test for {@link PrometheusReporter} that registers several instances of the same metric for different subtasks. + */ +public class PrometheusReporterTaskScopeTest { + private static final String[] LABEL_NAMES = {"job_id", "task_id", "task_attempt_id", "host", "task_name", "task_attempt_num", "job_name", "tm_id", "subtask_index"}; + + private static final String TASK_MANAGER_HOST = "taskManagerHostName"; + private static final String TASK_MANAGER_ID = "taskManagerId"; + private static final String JOB_NAME = "jobName"; + private static final String TASK_NAME = "taskName"; + private static final int ATTEMPT_NUMBER = 0; + private static final int SUBTASK_INDEX_1 = 0; + private static final int SUBTASK_INDEX_2 = 1; + + private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9429"))); + + private final JobID jobId = new JobID(); + private final JobVertexID taskId1 = new JobVertexID(); + private final AbstractID taskAttemptId1 = new AbstractID(); + private final String[] labelValues1 = {jobId.toString(), taskId1.toString(), taskAttemptId1.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_1}; + private final JobVertexID taskId2 = new JobVertexID(); + private final AbstractID taskAttemptId2 = new AbstractID(); + private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; + + private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); + private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); + private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + + @Test + public void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException { + Counter counter1 = new SimpleCounter(); + counter1.inc(1); + Counter counter2 = new SimpleCounter(); + counter2.inc(2); + + taskMetricGroup1.counter("my_counter", counter1); + taskMetricGroup2.counter("my_counter", counter2); + + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1), + equalTo(1.)); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues2), + equalTo(2.)); + } + + @Test + public void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException { + Gauge<Integer> gauge1 = new Gauge<Integer>() { + @Override + public Integer getValue() { + return 3; + } + }; + Gauge<Integer> gauge2 = new Gauge<Integer>() { + @Override + public Integer getValue() { + return 4; + } + }; + + taskMetricGroup1.gauge("my_gauge", gauge1); + taskMetricGroup2.gauge("my_gauge", gauge2); + + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues1), + equalTo(3.)); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues2), + equalTo(4.)); + } + + @Test + public void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException { + Meter meter = new TestMeter(); + + taskMetricGroup1.meter("my_meter", meter); + taskMetricGroup2.meter("my_meter", meter); + + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues1), + equalTo(5.)); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_meter", LABEL_NAMES, labelValues2), + equalTo(5.)); + } + + @Test + public void histogramsCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException { + Histogram histogram = new TestingHistogram(); + + taskMetricGroup1.histogram("my_histogram", histogram); + taskMetricGroup2.histogram("my_histogram", histogram); + + final String exportedMetrics = pollMetrics().getBody(); + assertThat(exportedMetrics, containsString("subtask_index=\"0\",quantile=\"0.5\",} 0.5")); // histogram + assertThat(exportedMetrics, containsString("subtask_index=\"1\",quantile=\"0.5\",} 0.5")); // histogram + + final String[] labelNamesWithQuantile = addToArray(LABEL_NAMES, "quantile"); + for (Double quantile : PrometheusReporter.HistogramSummaryProxy.QUANTILES) { + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_histogram", labelNamesWithQuantile, addToArray(labelValues1, "" + quantile)), + equalTo(quantile)); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_histogram", labelNamesWithQuantile, addToArray(labelValues2, "" + quantile)), + equalTo(quantile)); + } + } + + @Test + public void removingSingleInstanceOfMetricDoesNotBreakOtherInstances() throws UnirestException { + Counter counter1 = new SimpleCounter(); + counter1.inc(1); + Counter counter2 = new SimpleCounter(); + counter2.inc(2); + + taskMetricGroup1.counter("my_counter", counter1); + taskMetricGroup2.counter("my_counter", counter2); + + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1), + equalTo(1.)); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues2), + equalTo(2.)); + + taskMetricGroup2.close(); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1), + equalTo(1.)); + + taskMetricGroup1.close(); + assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter", LABEL_NAMES, labelValues1), + nullValue()); + } + + private String[] addToArray(String[] array, String element) { + final String[] labelNames = Arrays.copyOf(array, LABEL_NAMES.length + 1); + labelNames[LABEL_NAMES.length] = element; + return labelNames; + } + + @After + public void shutdownRegistry() { + registry.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 83b7b41..956339b 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -47,13 +47,13 @@ import org.junit.rules.ExpectedException; import java.util.Arrays; import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT; -import static org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; /** - * Test for {@link PrometheusReporter}. + * Basic test for {@link PrometheusReporter}. */ public class PrometheusReporterTest extends TestLogger { private static final int NON_DEFAULT_PORT = 9429; @@ -70,22 +70,21 @@ public class PrometheusReporterTest extends TestLogger { @Rule public ExpectedException thrown = ExpectedException.none(); - private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter())); + private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "" + NON_DEFAULT_PORT))); + private final FrontMetricGroup<TaskManagerMetricGroup> metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); private final MetricReporter reporter = registry.getReporters().get(0); + /** + * {@link io.prometheus.client.Counter} may not decrease, so report {@link Counter} as {@link io.prometheus.client.Gauge}. + * + * @throws UnirestException Might be thrown on HTTP problems. + */ @Test public void counterIsReportedAsPrometheusGauge() throws UnirestException { - //Prometheus counters may not decrease Counter testCounter = new SimpleCounter(); testCounter.inc(7); - String counterName = "testCounter"; - String gaugeName = SCOPE_PREFIX + counterName; - - assertThat(addMetricAndPollResponse(testCounter, counterName), - equalTo(HELP_PREFIX + gaugeName + " " + getFullMetricName(counterName) + "\n" + - TYPE_PREFIX + gaugeName + " gauge" + "\n" + - gaugeName + DEFAULT_LABELS + " 7.0" + "\n")); + assertThatGaugeIsExported(testCounter, "testCounter", "7.0"); } @Test @@ -97,13 +96,34 @@ public class PrometheusReporterTest extends TestLogger { } }; - String gaugeName = "testGauge"; - String prometheusGaugeName = SCOPE_PREFIX + gaugeName; + assertThatGaugeIsExported(testGauge, "testGauge", "1.0"); + } + + @Test + public void nullGaugeDoesNotBreakReporter() throws UnirestException { + Gauge<Integer> testGauge = new Gauge<Integer>() { + @Override + public Integer getValue() { + return null; + } + }; + + assertThatGaugeIsExported(testGauge, "testGauge", "0.0"); + } + + @Test + public void meterRateIsReportedAsPrometheusGauge() throws UnirestException { + Meter testMeter = new TestMeter(); + + assertThatGaugeIsExported(testMeter, "testMeter", "5.0"); + } - assertThat(addMetricAndPollResponse(testGauge, gaugeName), - equalTo(HELP_PREFIX + prometheusGaugeName + " " + getFullMetricName(gaugeName) + "\n" + - TYPE_PREFIX + prometheusGaugeName + " gauge" + "\n" + - prometheusGaugeName + DEFAULT_LABELS + " 1.0" + "\n")); + private void assertThatGaugeIsExported(Metric metric, String name, String expectedValue) throws UnirestException { + final String prometheusName = SCOPE_PREFIX + name; + assertThat(addMetricAndPollResponse(metric, name), + containsString(HELP_PREFIX + prometheusName + " " + name + " (scope: taskmanager)\n" + + TYPE_PREFIX + prometheusName + " gauge" + "\n" + + prometheusName + DEFAULT_LABELS + " " + expectedValue + "\n")); } @Test @@ -114,7 +134,7 @@ public class PrometheusReporterTest extends TestLogger { String summaryName = SCOPE_PREFIX + histogramName; String response = addMetricAndPollResponse(testHistogram, histogramName); - assertThat(response, containsString(HELP_PREFIX + summaryName + " " + getFullMetricName(histogramName) + "\n" + + assertThat(response, containsString(HELP_PREFIX + summaryName + " " + histogramName + " (scope: taskmanager)\n" + TYPE_PREFIX + summaryName + " summary" + "\n" + summaryName + "_count" + DEFAULT_LABELS + " 1.0" + "\n")); for (String quantile : Arrays.asList("0.5", "0.75", "0.95", "0.98", "0.99", "0.999")) { @@ -124,19 +144,6 @@ public class PrometheusReporterTest extends TestLogger { } @Test - public void meterRateIsReportedAsPrometheusGauge() throws UnirestException { - Meter testMeter = new TestMeter(); - - String meterName = "testMeter"; - String counterName = SCOPE_PREFIX + meterName; - - assertThat(addMetricAndPollResponse(testMeter, meterName), - equalTo(HELP_PREFIX + counterName + " " + getFullMetricName(meterName) + "\n" + - TYPE_PREFIX + counterName + " gauge" + "\n" + - counterName + DEFAULT_LABELS + " 5.0" + "\n")); - } - - @Test public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException { reporter.close(); thrown.expect(UnirestException.class); @@ -160,25 +167,119 @@ public class PrometheusReporterTest extends TestLogger { assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c")); } + @Test + public void doubleGaugeIsConvertedCorrectly() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge<Double>() { + @Override + public Double getValue() { + return 3.14; + } + }).get(), equalTo(3.14)); + } + + @Test + public void shortGaugeIsConvertedCorrectly() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge<Short>() { + @Override + public Short getValue() { + return 13; + } + }).get(), equalTo(13.)); + } + + @Test + public void booleanGaugeIsConvertedCorrectly() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge<Boolean>() { + @Override + public Boolean getValue() { + return true; + } + }).get(), equalTo(1.)); + } + + /** + * Prometheus only supports numbers, so report non-numeric gauges as 0. + */ + @Test + public void stringGaugeCannotBeConverted() { + assertThat(PrometheusReporter.gaugeFrom(new Gauge<String>() { + @Override + public String getValue() { + return "I am not a number"; + } + }).get(), equalTo(0.)); + } + + @Test + public void registeringSameMetricTwiceDoesNotThrowException() { + Counter counter = new SimpleCounter(); + counter.inc(); + String counterName = "testCounter"; + + reporter.notifyOfAddedMetric(counter, counterName, metricGroup); + reporter.notifyOfAddedMetric(counter, counterName, metricGroup); + } + + @Test + public void addingUnknownMetricTypeDoesNotThrowException(){ + class SomeMetricType implements Metric{} + + reporter.notifyOfAddedMetric(new SomeMetricType(), "name", metricGroup); + } + + @Test + public void cannotStartTwoReportersOnSamePort() { + final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "12345"))); + final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "12345"))); + + assertThat(fixedPort1.getReporters(), hasSize(1)); + assertThat(fixedPort2.getReporters(), hasSize(0)); + + fixedPort1.shutdown(); + fixedPort2.shutdown(); + } + + @Test + public void canStartTwoReportersWhenUsingPortRange() { + final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9249-9252"))); + final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9249-9252"))); + + assertThat(portRange1.getReporters(), hasSize(1)); + assertThat(portRange2.getReporters(), hasSize(1)); + + portRange1.shutdown(); + portRange2.shutdown(); + } + + @Test + public void cannotStartThreeReportersWhenPortRangeIsTooSmall() { + final MetricRegistry smallPortRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9253-9254"))); + final MetricRegistry smallPortRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9253-9254"))); + final MetricRegistry smallPortRange3 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test3", "9253-9254"))); + + assertThat(smallPortRange1.getReporters(), hasSize(1)); + assertThat(smallPortRange2.getReporters(), hasSize(1)); + assertThat(smallPortRange3.getReporters(), hasSize(0)); + + smallPortRange1.shutdown(); + smallPortRange2.shutdown(); + smallPortRange3.shutdown(); + } + private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException { - reporter.notifyOfAddedMetric(metric, metricName, new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER))); + reporter.notifyOfAddedMetric(metric, metricName, metricGroup); return pollMetrics().getBody(); } - private static HttpResponse<String> pollMetrics() throws UnirestException { + static HttpResponse<String> pollMetrics() throws UnirestException { return Unirest.get("http://localhost:" + NON_DEFAULT_PORT + "/metrics").asString(); } - private static String getFullMetricName(String metricName) { - return HOST_NAME + SCOPE_SEPARATOR + "taskmanager" + SCOPE_SEPARATOR + TASK_MANAGER + SCOPE_SEPARATOR + metricName; - } - - private static Configuration createConfigWithOneReporter() { + static Configuration createConfigWithOneReporter(String reporterName, String portString) { Configuration cfg = new Configuration(); - cfg.setString(MetricOptions.REPORTERS_LIST, "test1"); - cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + - ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName()); - cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ARG_PORT, "" + NON_DEFAULT_PORT); + cfg.setString(MetricOptions.REPORTERS_LIST, reporterName); + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName()); + cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ARG_PORT, portString); return cfg; } http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b96f84c..9384a48 100644 --- a/pom.xml +++ b/pom.xml @@ -116,7 +116,7 @@ under the License. <curator.version>2.12.0</curator.version> <jackson.version>2.7.4</jackson.version> <metrics.version>3.1.5</metrics.version> - <prometheus.version>0.0.21</prometheus.version> + <prometheus.version>0.0.26</prometheus.version> <junit.version>4.12</junit.version> <mockito.version>1.10.19</mockito.version> <powermock.version>1.6.5</powermock.version>
