Repository: flink
Updated Branches:
  refs/heads/master c0199f5d1 -> 56017a98f


[FLINK-7502][metrics] Improve PrometheusReporter

* Do not throw exception when same metric is added twice
* Add possibility to configure port range
* Bump prometheus.version 0.0.21 -> 0.0.26
* Use simpleclient_httpserver instead of nanohttpd
* guard gauge report against null
* guard close() vs NPE

This closes #4586.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56017a98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56017a98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56017a98

Branch: refs/heads/master
Commit: 56017a98fa61fdfae1c8dadd90a378ffdb3fea72
Parents: c0199f5
Author: Maximilian Bode <[email protected]>
Authored: Thu Aug 24 15:59:24 2017 +0200
Committer: zentol <[email protected]>
Committed: Thu Oct 26 09:48:21 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |   6 +-
 flink-metrics/flink-metrics-prometheus/pom.xml  |  19 +-
 .../metrics/prometheus/PrometheusReporter.java  | 229 +++++++++++--------
 .../PrometheusReporterTaskScopeTest.java        | 188 +++++++++++++++
 .../prometheus/PrometheusReporterTest.java      | 185 +++++++++++----
 pom.xml                                         |   2 +-
 6 files changed, 477 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index e191101..64d7318 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -424,7 +424,7 @@ of your Flink distribution.
 
 Parameters:
 
-- `port` - (optional) the port the Prometheus exporter listens on, defaults to 
[9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations).
+- `port` - (optional) the port the Prometheus exporter listens on, defaults to 
[9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations). 
In order to be able to run several instances of the reporter on one host (e.g. 
when one TaskManager is colocated with the JobManager) it is advisable to use a 
port range like `9250-9260`.
 
 Example configuration:
 
@@ -440,11 +440,11 @@ Flink metric types are mapped to Prometheus metric types 
as follows:
 | Flink     | Prometheus | Note                                     |
 | --------- |------------|------------------------------------------|
 | Counter   | Gauge      |Prometheus counters cannot be decremented.|
-| Gauge     | Gauge      |                                          |
+| Gauge     | Gauge      |Only numbers and booleans are supported.  |
 | Histogram | Summary    |Quantiles .5, .75, .95, .98, .99 and .999 |
 | Meter     | Gauge      |The gauge exports the meter's rate.       |
 
-All Flink metrics variables, such as `<host>`, `<job_name>`, `<tm_id>`, 
`<subtask_index>`, `<task_name>` and `<operator_name>`, are exported to 
Prometheus as labels. 
+All Flink metrics variables (see [List of all 
Variables](#list-of-all-variables)) are exported to Prometheus as labels. 
 
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml 
b/flink-metrics/flink-metrics-prometheus/pom.xml
index f8f2eea..0e9b261 100644
--- a/flink-metrics/flink-metrics-prometheus/pom.xml
+++ b/flink-metrics/flink-metrics-prometheus/pom.xml
@@ -42,6 +42,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
@@ -62,16 +69,10 @@ under the License.
 
                <dependency>
                        <groupId>io.prometheus</groupId>
-                       <artifactId>simpleclient_servlet</artifactId>
+                       <artifactId>simpleclient_httpserver</artifactId>
                        <version>${prometheus.version}</version>
                </dependency>
 
-               <dependency>
-                       <groupId>org.nanohttpd</groupId>
-                       <artifactId>nanohttpd</artifactId>
-                       <version>2.2.0</version>
-               </dependency>
-
                <!-- test dependencies -->
 
                <dependency>
@@ -114,10 +115,6 @@ under the License.
                                                                        
<pattern>io.prometheus.client</pattern>
                                                                        
<shadedPattern>org.apache.flink.shaded.io.prometheus.client</shadedPattern>
                                                                </relocation>
-                                                               <relocation>
-                                                                       
<pattern>fi.iki.elonen</pattern>
-                                                                       
<shadedPattern>org.apache.flink.shaded.fi.iki.elonen</shadedPattern>
-                                                               </relocation>
                                                        </relocations>
                                                </configuration>
                                        </execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
index d23be8c..1e44ab9 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -24,7 +24,6 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
@@ -32,20 +31,21 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.util.NetUtils;
 
-import fi.iki.elonen.NanoHTTPD;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
+import io.prometheus.client.exporter.HTTPServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.StringWriter;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,7 +59,7 @@ public class PrometheusReporter implements MetricReporter {
        private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporter.class);
 
        static final String ARG_PORT = "port";
-       private static final int DEFAULT_PORT = 9249;
+       private static final String DEFAULT_PORT = "9249";
 
        private static final Pattern UNALLOWED_CHAR_PATTERN = 
Pattern.compile("[^a-zA-Z0-9:_]");
        private static final CharacterFilter CHARACTER_FILTER = new 
CharacterFilter() {
@@ -72,8 +72,8 @@ public class PrometheusReporter implements MetricReporter {
        private static final char SCOPE_SEPARATOR = '_';
        private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
 
-       private PrometheusEndpoint prometheusEndpoint;
-       private final Map<String, Collector> collectorsByMetricName = new 
HashMap<>();
+       private HTTPServer httpServer;
+       private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, 
Integer>> collectorsWithCountByMetricName = new HashMap<>();
 
        @VisibleForTesting
        static String replaceInvalidChars(final String input) {
@@ -84,27 +84,34 @@ public class PrometheusReporter implements MetricReporter {
 
        @Override
        public void open(MetricConfig config) {
-               int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
-               LOG.info("Using port {}.", port);
-               prometheusEndpoint = new PrometheusEndpoint(port);
-               try {
-                       prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, 
true);
-               } catch (IOException e) {
-                       final String msg = "Could not start PrometheusEndpoint 
on port " + port;
-                       LOG.warn(msg, e);
-                       throw new RuntimeException(msg, e);
+               String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
+               Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(portsConfig);
+
+               while (ports.hasNext()) {
+                       int port = ports.next();
+                       try {
+                               httpServer = new HTTPServer(port);
+                               LOG.info("Started PrometheusReporter HTTP 
server on port {}.", port);
+                               break;
+                       } catch (IOException ioe) { //assume port conflict
+                               LOG.debug("Could not start PrometheusReporter 
HTTP server on port {}.", port, ioe);
+                       }
+               }
+               if (httpServer == null) {
+                       throw new RuntimeException("Could not start 
PrometheusReporter HTTP server on any configured port. Ports: " + portsConfig);
                }
        }
 
        @Override
        public void close() {
-               prometheusEndpoint.stop();
+               if (httpServer != null) {
+                       httpServer.stop();
+               }
                CollectorRegistry.defaultRegistry.clear();
        }
 
        @Override
        public void notifyOfAddedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
-               final String scope = SCOPE_PREFIX + getLogicalScope(group);
 
                List<String> dimensionKeys = new LinkedList<>();
                List<String> dimensionValues = new LinkedList<>();
@@ -114,30 +121,86 @@ public class PrometheusReporter implements MetricReporter 
{
                        
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
                }
 
-               final String validMetricName = scope + SCOPE_SEPARATOR + 
CHARACTER_FILTER.filterCharacters(metricName);
-               final String metricIdentifier = 
group.getMetricIdentifier(metricName);
+               final String scopedMetricName = getScopedName(metricName, 
group);
+               final String helpString = metricName + " (scope: " + 
getLogicalScope(group) + ")";
+
                final Collector collector;
+               Integer count = 0;
+
+               synchronized (this) {
+                       if 
(collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
+                               final 
AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
+                               collector = collectorWithCount.getKey();
+                               count = collectorWithCount.getValue();
+                       } else {
+                               collector = createCollector(metric, 
dimensionKeys, dimensionValues, scopedMetricName, helpString);
+                               try {
+                                       collector.register();
+                               } catch (Exception e) {
+                                       LOG.warn("There was a problem 
registering metric {}.", metricName, e);
+                               }
+                       }
+                       addMetric(metric, dimensionValues, collector);
+                       collectorsWithCountByMetricName.put(scopedMetricName, 
new AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
+               }
+       }
+
+       private static String getScopedName(String metricName, MetricGroup 
group) {
+               return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR 
+ CHARACTER_FILTER.filterCharacters(metricName);
+       }
+
+       private static Collector createCollector(Metric metric, List<String> 
dimensionKeys, List<String> dimensionValues, String scopedMetricName, String 
helpString) {
+               Collector collector;
+               if (metric instanceof Gauge || metric instanceof Counter || 
metric instanceof Meter) {
+                       collector = io.prometheus.client.Gauge
+                               .build()
+                               .name(scopedMetricName)
+                               .help(helpString)
+                               .labelNames(toArray(dimensionKeys))
+                               .create();
+               } else if (metric instanceof Histogram) {
+                       collector = new HistogramSummaryProxy((Histogram) 
metric, scopedMetricName, helpString, dimensionKeys, dimensionValues);
+               } else {
+                       LOG.warn("Cannot create collector for unknown metric 
type: {}. This indicates that the metric type is not supported by this 
reporter.",
+                               metric.getClass().getName());
+                       collector = null;
+               }
+               return collector;
+       }
+
+       private static void addMetric(Metric metric, List<String> 
dimensionValues, Collector collector) {
                if (metric instanceof Gauge) {
-                       collector = createGauge((Gauge) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
                } else if (metric instanceof Counter) {
-                       collector = createGauge((Counter) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
                } else if (metric instanceof Meter) {
-                       collector = createGauge((Meter) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+                       ((io.prometheus.client.Gauge) 
collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
                } else if (metric instanceof Histogram) {
-                       collector = createSummary((Histogram) metric, 
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+                       ((HistogramSummaryProxy) 
collector).addChild((Histogram) metric, dimensionValues);
                } else {
                        LOG.warn("Cannot add unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
                                metric.getClass().getName());
-                       return;
                }
-               collector.register();
-               collectorsByMetricName.put(metricName, collector);
        }
 
        @Override
        public void notifyOfRemovedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
-               
CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
-               collectorsByMetricName.remove(metricName);
+               final String scopedMetricName = getScopedName(metricName, 
group);
+               synchronized (this) {
+                       final AbstractMap.SimpleImmutableEntry<Collector, 
Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
+                       final Integer count = collectorWithCount.getValue();
+                       final Collector collector = collectorWithCount.getKey();
+                       if (count == 1) {
+                               try {
+                                       
CollectorRegistry.defaultRegistry.unregister(collector);
+                               } catch (Exception e) {
+                                       LOG.warn("There was a problem 
unregistering metric {}.", scopedMetricName, e);
+                               }
+                               
collectorsWithCountByMetricName.remove(scopedMetricName);
+                       } else {
+                               
collectorsWithCountByMetricName.put(scopedMetricName, new 
AbstractMap.SimpleImmutableEntry<>(collector, count - 1));
+                       }
+               }
        }
 
        @SuppressWarnings("unchecked")
@@ -145,97 +208,65 @@ public class PrometheusReporter implements MetricReporter 
{
                return ((FrontMetricGroup<AbstractMetricGroup<?>>) 
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
        }
 
-       private Collector createGauge(final Gauge gauge, final String name, 
final String identifier, final List<String> labelNames, final List<String> 
labelValues) {
-               return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+       @VisibleForTesting
+       static io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) {
+               return new io.prometheus.client.Gauge.Child() {
                        @Override
                        public double get() {
                                final Object value = gauge.getValue();
+                               if (value == null) {
+                                       LOG.debug("Gauge {} is null-valued, 
defaulting to 0.", gauge);
+                                       return 0;
+                               }
                                if (value instanceof Double) {
                                        return (double) value;
                                }
                                if (value instanceof Number) {
                                        return ((Number) value).doubleValue();
-                               } else if (value instanceof Boolean) {
+                               }
+                               if (value instanceof Boolean) {
                                        return ((Boolean) value) ? 1 : 0;
-                               } else {
-                                       LOG.debug("Invalid type for Gauge {}: 
{}, only number types and booleans are supported by this reporter.",
-                                               gauge, 
value.getClass().getName());
-                                       return 0;
                                }
+                               LOG.debug("Invalid type for Gauge {}: {}, only 
number types and booleans are supported by this reporter.",
+                                       gauge, value.getClass().getName());
+                               return 0;
                        }
-               });
+               };
        }
 
-       private static Collector createGauge(final Counter counter, final 
String name, final String identifier, final List<String> labelNames, final 
List<String> labelValues) {
-               return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+       private static io.prometheus.client.Gauge.Child gaugeFrom(Counter 
counter) {
+               return new io.prometheus.client.Gauge.Child() {
                        @Override
                        public double get() {
                                return (double) counter.getCount();
                        }
-               });
+               };
        }
 
-       private Collector createGauge(final Meter meter, final String name, 
final String identifier, final List<String> labelNames, final List<String> 
labelValues) {
-               return newGauge(name, identifier, labelNames, labelValues, new 
io.prometheus.client.Gauge.Child() {
+       private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
+               return new io.prometheus.client.Gauge.Child() {
                        @Override
                        public double get() {
                                return meter.getRate();
                        }
-               });
-       }
-
-       private static Collector newGauge(String name, String identifier, 
List<String> labelNames, List<String> labelValues, 
io.prometheus.client.Gauge.Child child) {
-               return io.prometheus.client.Gauge
-                       .build()
-                       .name(name)
-                       .help(identifier)
-                       .labelNames(toArray(labelNames))
-                       .create()
-                       .setChild(child, toArray(labelValues));
-       }
-
-       private static HistogramSummaryProxy createSummary(final Histogram 
histogram, final String name, final String identifier, final List<String> 
dimensionKeys, final List<String> dimensionValues) {
-               return new HistogramSummaryProxy(histogram, name, identifier, 
dimensionKeys, dimensionValues);
-       }
-
-       static class PrometheusEndpoint extends NanoHTTPD {
-               static final String MIME_TYPE = "plain/text";
-
-               PrometheusEndpoint(int port) {
-                       super(port);
-               }
-
-               @Override
-               public Response serve(IHTTPSession session) {
-                       if (session.getUri().equals("/metrics")) {
-                               StringWriter writer = new StringWriter();
-                               try {
-                                       TextFormat.write004(writer, 
CollectorRegistry.defaultRegistry.metricFamilySamples());
-                               } catch (IOException e) {
-                                       return 
newFixedLengthResponse(Response.Status.INTERNAL_ERROR, MIME_TYPE, "Unable to 
output metrics");
-                               }
-                               return 
newFixedLengthResponse(Response.Status.OK, TextFormat.CONTENT_TYPE_004, 
writer.toString());
-                       } else {
-                               return 
newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_TYPE, "Not found");
-                       }
-               }
+               };
        }
 
-       private static class HistogramSummaryProxy extends Collector {
-               private static final List<Double> QUANTILES = Arrays.asList(.5, 
.75, .95, .98, .99, .999);
+       @VisibleForTesting
+       static class HistogramSummaryProxy extends Collector {
+               static final List<Double> QUANTILES = Arrays.asList(.5, .75, 
.95, .98, .99, .999);
 
-               private final Histogram histogram;
                private final String metricName;
-               private final String metricIdentifier;
+               private final String helpString;
                private final List<String> labelNamesWithQuantile;
-               private final List<String> labelValues;
 
-               HistogramSummaryProxy(final Histogram histogram, final String 
metricName, final String metricIdentifier, final List<String> labelNames, final 
List<String> labelValues) {
-                       this.histogram = histogram;
+               private final Map<List<String>, Histogram> 
histogramsByLabelValues = new HashMap<>();
+
+               HistogramSummaryProxy(final Histogram histogram, final String 
metricName, final String helpString, final List<String> labelNames, final 
List<String> labelValues) {
                        this.metricName = metricName;
-                       this.metricIdentifier = metricIdentifier;
+                       this.helpString = helpString;
                        this.labelNamesWithQuantile = addToList(labelNames, 
"quantile");
-                       this.labelValues = labelValues;
+                       histogramsByLabelValues.put(labelValues, histogram);
                }
 
                @Override
@@ -243,17 +274,25 @@ public class PrometheusReporter implements MetricReporter 
{
                        // We cannot use SummaryMetricFamily because it is 
impossible to get a sum of all values (at least for Dropwizard histograms,
                        // whose snapshot's values array only holds a sample of 
recent values).
 
-                       final HistogramStatistics statistics = 
histogram.getStatistics();
-
                        List<MetricFamilySamples.Sample> samples = new 
LinkedList<>();
+                       for (Map.Entry<List<String>, Histogram> 
labelValuesToHistogram : histogramsByLabelValues.entrySet()) {
+                               addSamples(labelValuesToHistogram.getKey(), 
labelValuesToHistogram.getValue(), samples);
+                       }
+                       return Collections.singletonList(new 
MetricFamilySamples(metricName, Type.SUMMARY, helpString, samples));
+               }
+
+               void addChild(final Histogram histogram, final List<String> 
labelValues) {
+                       histogramsByLabelValues.put(labelValues, histogram);
+               }
+
+               private void addSamples(final List<String> labelValues, final 
Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
                        samples.add(new MetricFamilySamples.Sample(metricName + 
"_count",
                                labelNamesWithQuantile.subList(0, 
labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
                        for (final Double quantile : QUANTILES) {
                                samples.add(new 
MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
                                        addToList(labelValues, 
quantile.toString()),
-                                       statistics.getQuantile(quantile)));
+                                       
histogram.getStatistics().getQuantile(quantile)));
                        }
-                       return Collections.singletonList(new 
MetricFamilySamples(metricName, Type.SUMMARY, metricIdentifier, samples));
                }
        }
 
@@ -263,7 +302,7 @@ public class PrometheusReporter implements MetricReporter {
                return result;
        }
 
-       private static String[] toArray(List<String> labelNames) {
-               return labelNames.toArray(new String[labelNames.size()]);
+       private static String[] toArray(List<String> list) {
+               return list.toArray(new String[list.size()]);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
new file mode 100644
index 0000000..c7d4040
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.AbstractID;
+
+import com.mashape.unirest.http.exceptions.UnirestException;
+import io.prometheus.client.CollectorRegistry;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.metrics.prometheus.PrometheusReporterTest.createConfigWithOneReporter;
+import static 
org.apache.flink.metrics.prometheus.PrometheusReporterTest.pollMetrics;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link PrometheusReporter} that registers several instances of the 
same metric for different subtasks.
+ */
+public class PrometheusReporterTaskScopeTest {
+       private static final String[] LABEL_NAMES = {"job_id", "task_id", 
"task_attempt_id", "host", "task_name", "task_attempt_num", "job_name", 
"tm_id", "subtask_index"};
+
+       private static final String TASK_MANAGER_HOST = "taskManagerHostName";
+       private static final String TASK_MANAGER_ID = "taskManagerId";
+       private static final String JOB_NAME = "jobName";
+       private static final String TASK_NAME = "taskName";
+       private static final int ATTEMPT_NUMBER = 0;
+       private static final int SUBTASK_INDEX_1 = 0;
+       private static final int SUBTASK_INDEX_2 = 1;
+
+       private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9429")));
+
+       private final JobID jobId = new JobID();
+       private final JobVertexID taskId1 = new JobVertexID();
+       private final AbstractID taskAttemptId1 = new AbstractID();
+       private final String[] labelValues1 = {jobId.toString(), 
taskId1.toString(), taskAttemptId1.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_1};
+       private final JobVertexID taskId2 = new JobVertexID();
+       private final AbstractID taskAttemptId2 = new AbstractID();
+       private final String[] labelValues2 = {jobId.toString(), 
taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" 
+ ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};
+
+       private final TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
+       private final TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
+       private final TaskMetricGroup taskMetricGroup1 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, 
SUBTASK_INDEX_1, ATTEMPT_NUMBER);
+       private final TaskMetricGroup taskMetricGroup2 = new 
TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, 
SUBTASK_INDEX_2, ATTEMPT_NUMBER);
+
+       @Test
+       public void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws 
UnirestException {
+               Counter counter1 = new SimpleCounter();
+               counter1.inc(1);
+               Counter counter2 = new SimpleCounter();
+               counter2.inc(2);
+
+               taskMetricGroup1.counter("my_counter", counter1);
+               taskMetricGroup2.counter("my_counter", counter2);
+
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter",
 LABEL_NAMES, labelValues1),
+                       equalTo(1.));
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter",
 LABEL_NAMES, labelValues2),
+                       equalTo(2.));
+       }
+
+       @Test
+       public void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws 
UnirestException {
+               Gauge<Integer> gauge1 = new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 3;
+                       }
+               };
+               Gauge<Integer> gauge2 = new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 4;
+                       }
+               };
+
+               taskMetricGroup1.gauge("my_gauge", gauge1);
+               taskMetricGroup2.gauge("my_gauge", gauge2);
+
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge",
 LABEL_NAMES, labelValues1),
+                       equalTo(3.));
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge",
 LABEL_NAMES, labelValues2),
+                       equalTo(4.));
+       }
+
+       @Test
+       public void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws 
UnirestException {
+               Meter meter = new TestMeter();
+
+               taskMetricGroup1.meter("my_meter", meter);
+               taskMetricGroup2.meter("my_meter", meter);
+
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_meter",
 LABEL_NAMES, labelValues1),
+                       equalTo(5.));
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_meter",
 LABEL_NAMES, labelValues2),
+                       equalTo(5.));
+       }
+
+       @Test
+       public void histogramsCanBeAddedSeveralTimesIfTheyDifferInLabels() 
throws UnirestException {
+               Histogram histogram = new TestingHistogram();
+
+               taskMetricGroup1.histogram("my_histogram", histogram);
+               taskMetricGroup2.histogram("my_histogram", histogram);
+
+               final String exportedMetrics = pollMetrics().getBody();
+               assertThat(exportedMetrics, 
containsString("subtask_index=\"0\",quantile=\"0.5\",} 0.5")); // histogram
+               assertThat(exportedMetrics, 
containsString("subtask_index=\"1\",quantile=\"0.5\",} 0.5")); // histogram
+
+               final String[] labelNamesWithQuantile = addToArray(LABEL_NAMES, 
"quantile");
+               for (Double quantile : 
PrometheusReporter.HistogramSummaryProxy.QUANTILES) {
+                       
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_histogram",
 labelNamesWithQuantile, addToArray(labelValues1, "" + quantile)),
+                               equalTo(quantile));
+                       
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_histogram",
 labelNamesWithQuantile, addToArray(labelValues2, "" + quantile)),
+                               equalTo(quantile));
+               }
+       }
+
+       @Test
+       public void removingSingleInstanceOfMetricDoesNotBreakOtherInstances() 
throws UnirestException {
+               Counter counter1 = new SimpleCounter();
+               counter1.inc(1);
+               Counter counter2 = new SimpleCounter();
+               counter2.inc(2);
+
+               taskMetricGroup1.counter("my_counter", counter1);
+               taskMetricGroup2.counter("my_counter", counter2);
+
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter",
 LABEL_NAMES, labelValues1),
+                       equalTo(1.));
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter",
 LABEL_NAMES, labelValues2),
+                       equalTo(2.));
+
+               taskMetricGroup2.close();
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter",
 LABEL_NAMES, labelValues1),
+                       equalTo(1.));
+
+               taskMetricGroup1.close();
+               
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_counter",
 LABEL_NAMES, labelValues1),
+                       nullValue());
+       }
+
+       private String[] addToArray(String[] array, String element) {
+               final String[] labelNames = Arrays.copyOf(array, 
LABEL_NAMES.length + 1);
+               labelNames[LABEL_NAMES.length] = element;
+               return labelNames;
+       }
+
+       @After
+       public void shutdownRegistry() {
+               registry.shutdown();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 83b7b41..956339b 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -47,13 +47,13 @@ import org.junit.rules.ExpectedException;
 import java.util.Arrays;
 
 import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
-import static 
org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 
 /**
- * Test for {@link PrometheusReporter}.
+ * Basic test for {@link PrometheusReporter}.
  */
 public class PrometheusReporterTest extends TestLogger {
        private static final int NON_DEFAULT_PORT = 9429;
@@ -70,22 +70,21 @@ public class PrometheusReporterTest extends TestLogger {
        @Rule
        public ExpectedException thrown = ExpectedException.none();
 
-       private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter()));
+       private final MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "" + NON_DEFAULT_PORT)));
+       private final FrontMetricGroup<TaskManagerMetricGroup> metricGroup = 
new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, 
TASK_MANAGER));
        private final MetricReporter reporter = registry.getReporters().get(0);
 
+       /**
+        * {@link io.prometheus.client.Counter} may not decrease, so report 
{@link Counter} as {@link io.prometheus.client.Gauge}.
+        *
+        * @throws UnirestException Might be thrown on HTTP problems.
+        */
        @Test
        public void counterIsReportedAsPrometheusGauge() throws 
UnirestException {
-               //Prometheus counters may not decrease
                Counter testCounter = new SimpleCounter();
                testCounter.inc(7);
 
-               String counterName = "testCounter";
-               String gaugeName = SCOPE_PREFIX + counterName;
-
-               assertThat(addMetricAndPollResponse(testCounter, counterName),
-                       equalTo(HELP_PREFIX + gaugeName + " " + 
getFullMetricName(counterName) + "\n" +
-                               TYPE_PREFIX + gaugeName + " gauge" + "\n" +
-                               gaugeName + DEFAULT_LABELS + " 7.0" + "\n"));
+               assertThatGaugeIsExported(testCounter, "testCounter", "7.0");
        }
 
        @Test
@@ -97,13 +96,34 @@ public class PrometheusReporterTest extends TestLogger {
                        }
                };
 
-               String gaugeName = "testGauge";
-               String prometheusGaugeName = SCOPE_PREFIX + gaugeName;
+               assertThatGaugeIsExported(testGauge, "testGauge", "1.0");
+       }
+
+       @Test
+       public void nullGaugeDoesNotBreakReporter() throws UnirestException {
+               Gauge<Integer> testGauge = new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return null;
+                       }
+               };
+
+               assertThatGaugeIsExported(testGauge, "testGauge", "0.0");
+       }
+
+       @Test
+       public void meterRateIsReportedAsPrometheusGauge() throws 
UnirestException {
+               Meter testMeter = new TestMeter();
+
+               assertThatGaugeIsExported(testMeter, "testMeter", "5.0");
+       }
 
-               assertThat(addMetricAndPollResponse(testGauge, gaugeName),
-                       equalTo(HELP_PREFIX + prometheusGaugeName + " " + 
getFullMetricName(gaugeName) + "\n" +
-                               TYPE_PREFIX + prometheusGaugeName + " gauge" + 
"\n" +
-                               prometheusGaugeName + DEFAULT_LABELS + " 1.0" + 
"\n"));
+       private void assertThatGaugeIsExported(Metric metric, String name, 
String expectedValue) throws UnirestException {
+               final String prometheusName = SCOPE_PREFIX + name;
+               assertThat(addMetricAndPollResponse(metric, name),
+                       containsString(HELP_PREFIX + prometheusName + " " + 
name + " (scope: taskmanager)\n" +
+                               TYPE_PREFIX + prometheusName + " gauge" + "\n" +
+                               prometheusName + DEFAULT_LABELS + " " + 
expectedValue + "\n"));
        }
 
        @Test
@@ -114,7 +134,7 @@ public class PrometheusReporterTest extends TestLogger {
                String summaryName = SCOPE_PREFIX + histogramName;
 
                String response = addMetricAndPollResponse(testHistogram, 
histogramName);
-               assertThat(response, containsString(HELP_PREFIX + summaryName + 
" " + getFullMetricName(histogramName) + "\n" +
+               assertThat(response, containsString(HELP_PREFIX + summaryName + 
" " + histogramName + " (scope: taskmanager)\n" +
                        TYPE_PREFIX + summaryName + " summary" + "\n" +
                        summaryName + "_count" + DEFAULT_LABELS + " 1.0" + 
"\n"));
                for (String quantile : Arrays.asList("0.5", "0.75", "0.95", 
"0.98", "0.99", "0.999")) {
@@ -124,19 +144,6 @@ public class PrometheusReporterTest extends TestLogger {
        }
 
        @Test
-       public void meterRateIsReportedAsPrometheusGauge() throws 
UnirestException {
-               Meter testMeter = new TestMeter();
-
-               String meterName = "testMeter";
-               String counterName = SCOPE_PREFIX + meterName;
-
-               assertThat(addMetricAndPollResponse(testMeter, meterName),
-                       equalTo(HELP_PREFIX + counterName + " " + 
getFullMetricName(meterName) + "\n" +
-                               TYPE_PREFIX + counterName + " gauge" + "\n" +
-                               counterName + DEFAULT_LABELS + " 5.0" + "\n"));
-       }
-
-       @Test
        public void endpointIsUnavailableAfterReporterIsClosed() throws 
UnirestException {
                reporter.close();
                thrown.expect(UnirestException.class);
@@ -160,25 +167,119 @@ public class PrometheusReporterTest extends TestLogger {
                
assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), 
equalTo("a___:__b___:__c"));
        }
 
+       @Test
+       public void doubleGaugeIsConvertedCorrectly() {
+               assertThat(PrometheusReporter.gaugeFrom(new Gauge<Double>() {
+                       @Override
+                       public Double getValue() {
+                               return 3.14;
+                       }
+               }).get(), equalTo(3.14));
+       }
+
+       @Test
+       public void shortGaugeIsConvertedCorrectly() {
+               assertThat(PrometheusReporter.gaugeFrom(new Gauge<Short>() {
+                       @Override
+                       public Short getValue() {
+                               return 13;
+                       }
+               }).get(), equalTo(13.));
+       }
+
+       @Test
+       public void booleanGaugeIsConvertedCorrectly() {
+               assertThat(PrometheusReporter.gaugeFrom(new Gauge<Boolean>() {
+                       @Override
+                       public Boolean getValue() {
+                               return true;
+                       }
+               }).get(), equalTo(1.));
+       }
+
+       /**
+        * Prometheus only supports numbers, so report non-numeric gauges as 0.
+        */
+       @Test
+       public void stringGaugeCannotBeConverted() {
+               assertThat(PrometheusReporter.gaugeFrom(new Gauge<String>() {
+                       @Override
+                       public String getValue() {
+                               return "I am not a number";
+                       }
+               }).get(), equalTo(0.));
+       }
+
+       @Test
+       public void registeringSameMetricTwiceDoesNotThrowException() {
+               Counter counter = new SimpleCounter();
+               counter.inc();
+               String counterName = "testCounter";
+
+               reporter.notifyOfAddedMetric(counter, counterName, metricGroup);
+               reporter.notifyOfAddedMetric(counter, counterName, metricGroup);
+       }
+
+       @Test
+       public void addingUnknownMetricTypeDoesNotThrowException(){
+               class SomeMetricType implements Metric{}
+
+               reporter.notifyOfAddedMetric(new SomeMetricType(), "name", 
metricGroup);
+       }
+
+       @Test
+       public void cannotStartTwoReportersOnSamePort() {
+               final MetricRegistry fixedPort1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "12345")));
+               final MetricRegistry fixedPort2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "12345")));
+
+               assertThat(fixedPort1.getReporters(), hasSize(1));
+               assertThat(fixedPort2.getReporters(), hasSize(0));
+
+               fixedPort1.shutdown();
+               fixedPort2.shutdown();
+       }
+
+       @Test
+       public void canStartTwoReportersWhenUsingPortRange() {
+               final MetricRegistry portRange1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9249-9252")));
+               final MetricRegistry portRange2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9249-9252")));
+
+               assertThat(portRange1.getReporters(), hasSize(1));
+               assertThat(portRange2.getReporters(), hasSize(1));
+
+               portRange1.shutdown();
+               portRange2.shutdown();
+       }
+
+       @Test
+       public void cannotStartThreeReportersWhenPortRangeIsTooSmall() {
+               final MetricRegistry smallPortRange1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9253-9254")));
+               final MetricRegistry smallPortRange2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9253-9254")));
+               final MetricRegistry smallPortRange3 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test3",
 "9253-9254")));
+
+               assertThat(smallPortRange1.getReporters(), hasSize(1));
+               assertThat(smallPortRange2.getReporters(), hasSize(1));
+               assertThat(smallPortRange3.getReporters(), hasSize(0));
+
+               smallPortRange1.shutdown();
+               smallPortRange2.shutdown();
+               smallPortRange3.shutdown();
+       }
+
        private String addMetricAndPollResponse(Metric metric, String 
metricName) throws UnirestException {
-               reporter.notifyOfAddedMetric(metric, metricName, new 
FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, 
TASK_MANAGER)));
+               reporter.notifyOfAddedMetric(metric, metricName, metricGroup);
                return pollMetrics().getBody();
        }
 
-       private static HttpResponse<String> pollMetrics() throws 
UnirestException {
+       static HttpResponse<String> pollMetrics() throws UnirestException {
                return Unirest.get("http://localhost:"; + NON_DEFAULT_PORT + 
"/metrics").asString();
        }
 
-       private static String getFullMetricName(String metricName) {
-               return HOST_NAME + SCOPE_SEPARATOR + "taskmanager" + 
SCOPE_SEPARATOR + TASK_MANAGER + SCOPE_SEPARATOR + metricName;
-       }
-
-       private static Configuration createConfigWithOneReporter() {
+       static Configuration createConfigWithOneReporter(String reporterName, 
String portString) {
                Configuration cfg = new Configuration();
-               cfg.setString(MetricOptions.REPORTERS_LIST, "test1");
-               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." +
-                       ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getName());
-               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ARG_PORT, "" + NON_DEFAULT_PORT);
+               cfg.setString(MetricOptions.REPORTERS_LIST, reporterName);
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getName());
+               cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
reporterName + "." + ARG_PORT, portString);
                return cfg;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56017a98/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b96f84c..9384a48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,7 @@ under the License.
                <curator.version>2.12.0</curator.version>
                <jackson.version>2.7.4</jackson.version>
                <metrics.version>3.1.5</metrics.version>
-               <prometheus.version>0.0.21</prometheus.version>
+               <prometheus.version>0.0.26</prometheus.version>
                <junit.version>4.12</junit.version>
                <mockito.version>1.10.19</mockito.version>
                <powermock.version>1.6.5</powermock.version>

Reply via email to