Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4586#discussion_r144804293
--- Diff:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
---
@@ -114,39 +120,78 @@ public void notifyOfAddedMetric(final Metric metric,
final String metricName, fi
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
}
- final String validMetricName = scope + SCOPE_SEPARATOR +
CHARACTER_FILTER.filterCharacters(metricName);
- final String metricIdentifier =
group.getMetricIdentifier(metricName);
+ final String scopedMetricName = getScopedName(metricName,
group);
+ final String helpString = metricName + " (scope: " +
getLogicalScope(group) + ")";
+
final Collector collector;
- if (metric instanceof Gauge) {
- collector = createGauge((Gauge) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
- } else if (metric instanceof Counter) {
- collector = createGauge((Counter) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
- } else if (metric instanceof Meter) {
- collector = createGauge((Meter) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
- } else if (metric instanceof Histogram) {
- collector = createSummary((Histogram) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ Integer count = 0;
+
+ if
(!collectorsWithCountByMetricName.containsKey(scopedMetricName)) {
+ if (metric instanceof Gauge) {
+ collector = newGauge(scopedMetricName,
helpString, dimensionKeys, dimensionValues, gaugeFrom((Gauge) metric));
+ } else if (metric instanceof Counter) {
+ collector = newGauge(scopedMetricName,
helpString, dimensionKeys, dimensionValues, gaugeFrom((Counter) metric));
+ } else if (metric instanceof Meter) {
+ collector = newGauge(scopedMetricName,
helpString, dimensionKeys, dimensionValues, gaugeFrom((Meter) metric));
+ } else if (metric instanceof Histogram) {
+ collector = new
HistogramSummaryProxy((Histogram) metric, scopedMetricName, helpString,
dimensionKeys, dimensionValues);
+ } else {
+ LOG.warn("Cannot add unknown metric type: {}.
This indicates that the metric type is not supported by this reporter.",
+ metric.getClass().getName());
+ return;
+ }
+ try {
+ collector.register();
+ } catch (Exception e) {
+ LOG.warn("There was a problem registering
metric {}.", metricName, e);
+ }
} else {
- LOG.warn("Cannot add unknown metric type: {}. This
indicates that the metric type is not supported by this reporter.",
- metric.getClass().getName());
- return;
+ final AbstractMap.SimpleImmutableEntry<Collector,
Integer> collectorWithCount =
collectorsWithCountByMetricName.get(scopedMetricName);
+ collector = collectorWithCount.getKey();
+ count = collectorWithCount.getValue();
+ if (metric instanceof Gauge) {
+ ((io.prometheus.client.Gauge)
collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
+ } else if (metric instanceof Counter) {
+ ((io.prometheus.client.Gauge)
collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
+ } else if (metric instanceof Meter) {
+ ((io.prometheus.client.Gauge)
collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
+ } else if (metric instanceof Histogram) {
+ ((HistogramSummaryProxy)
collector).addChild((Histogram) metric, dimensionValues);
+ }
}
- collector.register();
- collectorsByMetricName.put(metricName, collector);
+ collectorsWithCountByMetricName.put(scopedMetricName, new
AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
+ }
+
+ private static String getScopedName(String metricName, MetricGroup
group) {
+ return SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR
+ CHARACTER_FILTER.filterCharacters(metricName);
}
@Override
public void notifyOfRemovedMetric(final Metric metric, final String
metricName, final MetricGroup group) {
-
CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
- collectorsByMetricName.remove(metricName);
+ final String scopedMetricName = getScopedName(metricName,
group);
+ final AbstractMap.SimpleImmutableEntry<Collector, Integer>
collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
+ final Integer count = collectorWithCount.getValue();
+ final Collector collector = collectorWithCount.getKey();
+ if (count == 1) {
--- End diff --
You need a separate synchronization mechanism here to prevent
race-conditions between adding/removing metrics. A metric can be swallowed if
it is added between retrieving the collector and removing it from the map.
---