Repository: flink
Updated Branches:
  refs/heads/release-1.5 e1fcd458a -> 903a32336


[FLINK-9665][metrics] Unregister individual metrics in PrometheusReporter

This closes #6239.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/903a3233
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/903a3233
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/903a3233

Branch: refs/heads/release-1.5
Commit: 903a323366b91a6f2f471f067462df9dd20cd3f4
Parents: e1fcd458
Author: Jelmer Kuperus <jkupe...@marktplaats.nl>
Authored: Wed Jun 27 01:05:11 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Tue Jul 3 10:09:56 2018 +0200

----------------------------------------------------------------------
 .../metrics/prometheus/PrometheusReporter.java  | 28 ++++++++++++++++++++
 .../prometheus/PrometheusReporterTest.java      | 24 +++++++++++++++++
 2 files changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/903a3233/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
index 48fd8a4..ffa419c 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -193,13 +193,37 @@ public class PrometheusReporter implements MetricReporter 
{
                }
        }
 
+       private static void removeMetric(Metric metric, List<String> 
dimensionValues, Collector collector) {
+               if (metric instanceof Gauge) {
+                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
+               } else if (metric instanceof Counter) {
+                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
+               } else if (metric instanceof Meter) {
+                       ((io.prometheus.client.Gauge) 
collector).remove(toArray(dimensionValues));
+               } else if (metric instanceof Histogram) {
+                       ((HistogramSummaryProxy) 
collector).remove(dimensionValues);
+               } else {
+                       LOG.warn("Cannot remove unknown metric type: {}. This 
indicates that the metric type is not supported by this reporter.",
+                               metric.getClass().getName());
+               }
+       }
+
        @Override
        public void notifyOfRemovedMetric(final Metric metric, final String 
metricName, final MetricGroup group) {
+
+               List<String> dimensionValues = new LinkedList<>();
+               for (final Map.Entry<String, String> dimension : 
group.getAllVariables().entrySet()) {
+                       
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+               }
+
                final String scopedMetricName = getScopedName(metricName, 
group);
                synchronized (this) {
                        final AbstractMap.SimpleImmutableEntry<Collector, 
Integer> collectorWithCount = 
collectorsWithCountByMetricName.get(scopedMetricName);
                        final Integer count = collectorWithCount.getValue();
                        final Collector collector = collectorWithCount.getKey();
+
+                       removeMetric(metric, dimensionValues, collector);
+
                        if (count == 1) {
                                try {
                                        
CollectorRegistry.defaultRegistry.unregister(collector);
@@ -295,6 +319,10 @@ public class PrometheusReporter implements MetricReporter {
                        histogramsByLabelValues.put(labelValues, histogram);
                }
 
+               void remove(final List<String> labelValues) {
+                       histogramsByLabelValues.remove(labelValues);
+               }
+
                private void addSamples(final List<String> labelValues, final 
Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
                        samples.add(new MetricFamilySamples.Sample(metricName + 
"_count",
                                labelNamesWithQuantile.subList(0, 
labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));

http://git-wip-us.apache.org/repos/asf/flink/blob/903a3233/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index e9fd985..592c246 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.prometheus;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
@@ -31,6 +32,7 @@ import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
 
@@ -51,6 +53,7 @@ import static 
org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -160,6 +163,27 @@ public class PrometheusReporterTest extends TestLogger {
        }
 
        @Test
+       public void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws 
UnirestException {
+               TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER);
+
+               String metricName = "metric";
+
+               Counter metric1 = new SimpleCounter();
+               FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup1 = new 
FrontMetricGroup<>(0, new TaskManagerJobMetricGroup(registry, tmMetricGroup, 
JobID.generate(), "job_1"));
+               reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1);
+
+               Counter metric2 = new SimpleCounter();
+               FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup2 = new 
FrontMetricGroup<>(0, new TaskManagerJobMetricGroup(registry, tmMetricGroup, 
JobID.generate(), "job_2"));
+               reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);
+
+               reporter.notifyOfRemovedMetric(metric1, metricName, 
metricGroup1);
+
+               String response = pollMetrics(reporter.getPort()).getBody();
+
+               assertThat(response, not(containsString("job_1")));
+       }
+
+       @Test
        public void invalidCharactersAreReplacedWithUnderscore() {
                assertThat(PrometheusReporter.replaceInvalidChars(""), 
equalTo(""));
                assertThat(PrometheusReporter.replaceInvalidChars("abc"), 
equalTo("abc"));

Reply via email to