Repository: flink Updated Branches: refs/heads/release-1.5 e1fcd458a -> 903a32336
[FLINK-9665][metrics] Unregister individual metrics in PrometheusReporter This closes #6239. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/903a3233 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/903a3233 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/903a3233 Branch: refs/heads/release-1.5 Commit: 903a323366b91a6f2f471f067462df9dd20cd3f4 Parents: e1fcd458 Author: Jelmer Kuperus <jkupe...@marktplaats.nl> Authored: Wed Jun 27 01:05:11 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Tue Jul 3 10:09:56 2018 +0200 ---------------------------------------------------------------------- .../metrics/prometheus/PrometheusReporter.java | 28 ++++++++++++++++++++ .../prometheus/PrometheusReporterTest.java | 24 +++++++++++++++++ 2 files changed, 52 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/903a3233/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java index 48fd8a4..ffa419c 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java @@ -193,13 +193,37 @@ public class PrometheusReporter implements MetricReporter { } } + private static void removeMetric(Metric metric, List<String> dimensionValues, Collector collector) { + if (metric instanceof Gauge) { + ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues)); + } else if (metric instanceof Counter) { + ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues)); + } else if (metric instanceof Meter) { + ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues)); + } else if (metric instanceof Histogram) { + ((HistogramSummaryProxy) collector).remove(dimensionValues); + } else { + LOG.warn("Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", + metric.getClass().getName()); + } + } + @Override public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) { + + List<String> dimensionValues = new LinkedList<>(); + for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) { + dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue())); + } + final String scopedMetricName = getScopedName(metricName, group); synchronized (this) { final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName); final Integer count = collectorWithCount.getValue(); final Collector collector = collectorWithCount.getKey(); + + removeMetric(metric, dimensionValues, collector); + if (count == 1) { try { CollectorRegistry.defaultRegistry.unregister(collector); @@ -295,6 +319,10 @@ public class PrometheusReporter implements MetricReporter { histogramsByLabelValues.put(labelValues, histogram); } + void remove(final List<String> labelValues) { + histogramsByLabelValues.remove(labelValues); + } + private void addSamples(final List<String> labelValues, final Histogram histogram, final List<MetricFamilySamples.Sample> samples) { samples.add(new MetricFamilySamples.Sample(metricName + "_count", labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount())); http://git-wip-us.apache.org/repos/asf/flink/blob/903a3233/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index e9fd985..592c246 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -18,6 +18,7 @@ package org.apache.flink.metrics.prometheus; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; @@ -31,6 +32,7 @@ import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.util.TestLogger; @@ -51,6 +53,7 @@ import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; /** @@ -160,6 +163,27 @@ public class PrometheusReporterTest extends TestLogger { } @Test + public void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws UnirestException { + TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER); + + String metricName = "metric"; + + Counter metric1 = new SimpleCounter(); + FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup1 = new FrontMetricGroup<>(0, new TaskManagerJobMetricGroup(registry, tmMetricGroup, JobID.generate(), "job_1")); + reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1); + + Counter metric2 = new SimpleCounter(); + FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup2 = new FrontMetricGroup<>(0, new TaskManagerJobMetricGroup(registry, tmMetricGroup, JobID.generate(), "job_2")); + reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2); + + reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup1); + + String response = pollMetrics(reporter.getPort()).getBody(); + + assertThat(response, not(containsString("job_1"))); + } + + @Test public void invalidCharactersAreReplacedWithUnderscore() { assertThat(PrometheusReporter.replaceInvalidChars(""), equalTo("")); assertThat(PrometheusReporter.replaceInvalidChars("abc"), equalTo("abc"));