Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4188#discussion_r124222210
--- Diff:
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
---
@@ -90,6 +109,45 @@ public void close() {
//
------------------------------------------------------------------------
+ /**
+ * Removes leading and trailing angle brackets.
+ */
+ private String stripBrackets(String str) {
+ return str.substring(1, str.length() - 1);
+ }
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
+ if (dogstatsdMode) {
+ // memoize dogstatsd tag section:
"|#tag:val,tag:val,tag:val"
+ StringBuilder statsdTagLine = new StringBuilder();
+ Map<String, String> orderedTags = new
TreeMap<>(group.getAllVariables());
+ for (Map.Entry<String, String> entry:
orderedTags.entrySet()) {
+ String k = stripBrackets(entry.getKey());
+ String v = filterCharacters(entry.getValue());
+
statsdTagLine.append(",").append(k).append(":").append(v);
+ }
+ if (statsdTagLine.length() > 0) {
+ // remove first comma, prefix with "|#"
+ tagTable.put(metric, "|#" +
statsdTagLine.substring(1));
--- End diff --
Let's store the tags directly in the maps containing the metrics.
I've seen this requirement a few times now, so let's add a new
`AbstractReporter` class that has a generic argument for the information that
is stored in the map for each metric.
This would look like this:
```
public abstract class AbstractReporterV2<T> implements MetricReporter,
CharacterFilter {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final Map<Gauge<?>, T> gauges = new HashMap<>();
protected final Map<Counter, T> counters = new HashMap<>();
protected final Map<Histogram, T> histograms = new HashMap<>();
protected final Map<Meter, T> meters = new HashMap<>();
protected abstract T getMetricInfo(Metric metric, String metricName,
MetricGroup group);
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
T metricInfo = getMetricInfo(metric, metricName, group);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, metricInfo);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, metricInfo);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, metricInfo);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, metricInfo);
} else {
log.warn("Cannot add unknown metric type {}.
This indicates that the reporter " +
"does not support this metric type.",
metric.getClass().getName());
}
}
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
synchronized (this) {
if (metric instanceof Counter) {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else if (metric instanceof Meter) {
meters.remove(metric);
} else {
log.warn("Cannot remove unknown metric type {}.
This indicates that the reporter " +
"does not support this metric type.",
metric.getClass().getName());
}
}
}
}
```
You then wouldn't need to override notifyOfAdded/Removed metric and save 1
HashMap + 1 lookup per metric per report.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---