[
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064527#comment-16064527
]
ASF GitHub Bot commented on FLINK-7009:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4188#discussion_r124222210
--- Diff:
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
---
@@ -90,6 +109,45 @@ public void close() {
//
------------------------------------------------------------------------
+ /**
+ * Removes leading and trailing angle brackets.
+ */
+ private String stripBrackets(String str) {
+ return str.substring(1, str.length() - 1);
+ }
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
+ if (dogstatsdMode) {
+ // memoize dogstatsd tag section:
"|#tag:val,tag:val,tag:val"
+ StringBuilder statsdTagLine = new StringBuilder();
+ Map<String, String> orderedTags = new
TreeMap<>(group.getAllVariables());
+ for (Map.Entry<String, String> entry:
orderedTags.entrySet()) {
+ String k = stripBrackets(entry.getKey());
+ String v = filterCharacters(entry.getValue());
+
statsdTagLine.append(",").append(k).append(":").append(v);
+ }
+ if (statsdTagLine.length() > 0) {
+ // remove first comma, prefix with "|#"
+ tagTable.put(metric, "|#" +
statsdTagLine.substring(1));
--- End diff --
Let's store the tags directly in the maps containing the metrics.
I've seen this requirement a few times now, so let's add a new
`AbstractReporter` class that has a generic argument for the information that
is stored in the map for each metric.
This would look like this:
```
public abstract class AbstractReporterV2<T> implements MetricReporter,
CharacterFilter {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final Map<Gauge<?>, T> gauges = new HashMap<>();
protected final Map<Counter, T> counters = new HashMap<>();
protected final Map<Histogram, T> histograms = new HashMap<>();
protected final Map<Meter, T> meters = new HashMap<>();
protected abstract T getMetricInfo(Metric metric, String metricName,
MetricGroup group);
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
T metricInfo = getMetricInfo(metric, metricName, group);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, metricInfo);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, metricInfo);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, metricInfo);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, metricInfo);
} else {
log.warn("Cannot add unknown metric type {}.
This indicates that the reporter " +
"does not support this metric type.",
metric.getClass().getName());
}
}
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
synchronized (this) {
if (metric instanceof Counter) {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else if (metric instanceof Meter) {
meters.remove(metric);
} else {
log.warn("Cannot remove unknown metric type {}.
This indicates that the reporter " +
"does not support this metric type.",
metric.getClass().getName());
}
}
}
}
```
You then wouldn't need to override notifyOfAdded/Removed metric and save 1
HashMap + 1 lookup per metric per report.
> dogstatsd mode in statsd reporter
> ---------------------------------
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
> Reporter: David Brinegar
> Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing
> to the manner in which Flink variables are handled, mainly around invalid
> characters and metrics too long. As an option, it would be quite useful to
> have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged,
> should be less than 200 characters including tag names and values, be
> alphanumeric + underbar, delimited by periods. As a further pragmatic
> restriction, negative and other invalid values should be ignored rather than
> sent to the backend. These restrictions play well with a broad set of
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by
> periods. Runs of invalid characters within a metric segment would be
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic
> representation of the metric name, to preserve the unique metric time series
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id,
> task_attempt_id, to the first 8 characters, again to preserve enough
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters
> plus a hash of the long name. For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000),
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references. The stable version would be:
>
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000),
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point
> would be to preserve the metrics so they are valid, avoid truncation, and can
> be aggregated along other dimensions even if this particular dimension is
> hard to parse after the compression.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)