[
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064732#comment-16064732
]
ASF GitHub Bot commented on FLINK-7009:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4188#discussion_r124255898
--- Diff:
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
---
@@ -179,41 +254,130 @@ private String prefix(String ... names) {
}
}
- private void send(final String name, final String value) {
+ private String buildStatsdLine(final String name, final String value,
final String tags) {
+ Double number;
try {
- String formatted = String.format("%s:%s|g", name,
value);
- byte[] data =
formatted.getBytes(StandardCharsets.UTF_8);
- socket.send(new DatagramPacket(data, data.length,
this.address));
+ number = Double.parseDouble(value);
+ }
+ catch (NumberFormatException e) {
+ // quietly skip values like "n/a"
+ return "";
}
- catch (IOException e) {
- LOG.error("unable to send packet to statsd at '{}:{}'",
address.getHostName(), address.getPort());
+ if (number >= 0.) {
+ return String.format("%s:%s|g%s", name, value, tags !=
null ? tags : "");
+ } else {
+ // quietly skip "unknowns" like
lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN
+ return "";
}
}
- @Override
- public String filterCharacters(String input) {
+ private void send(final String name, final String value, final String
tags) {
+ String formatted = buildStatsdLine(name, value, tags);
+ if (formatted.length() > 0) {
+ try {
+ byte[] data =
formatted.getBytes(StandardCharsets.UTF_8);
+ socket.send(new DatagramPacket(data,
data.length, this.address));
+ }
+ catch (IOException e) {
+ LOG.error("unable to send packet to statsd at
'{}:{}'", address.getHostName(), address.getPort());
+ }
+ }
+ }
+
+ /**
+ * dogstatsd names should: start with letter, uses ascii alphanumerics
and underscore, separated by periods.
+ * Collapse runs of invalid characters into an underscore. Discard
invalid prefix and suffix.
+ * Eg: ":::metric:::name:::" -> "metric_name"
+ */
+
+ private boolean isValidStatsdChar(char c) {
+ return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c
>= '0' && c <= '9') || (c == '_');
+ }
+
+ private String filterNCharacters(String input, int limit) {
char[] chars = null;
final int strLen = input.length();
int pos = 0;
+ boolean insertFiller = false;
- for (int i = 0; i < strLen; i++) {
+ for (int i = 0; i < strLen && pos < limit; i++) {
final char c = input.charAt(i);
- switch (c) {
- case ':':
- if (chars == null) {
- chars = input.toCharArray();
+ if (isValidStatsdChar(c)) {
+ if (chars != null) {
+ // skip invalid suffix, only fill if
followed by valid character
+ if (insertFiller) {
+ chars[pos++] = '_';
+ insertFiller = false;
}
- chars[pos++] = '-';
- break;
+ chars[pos] = c;
+ }
+ pos++;
+ } else {
+ if (chars == null) {
+ chars = input.toCharArray();
+ }
+ // skip invalid prefix, until pos > 0
+ if (pos > 0) {
+ // collapse sequence of invalid char
into one filler
+ insertFiller = true;
+ }
+ }
+ }
- default:
- if (chars != null) {
- chars[pos] = c;
- }
- pos++;
+ if (chars == null) {
+ if (strLen > limit) {
+ return input.substring(0, limit);
+ } else {
+ return input; // happy path, input is entirely
valid and under the limit
}
+ } else {
+ return new String(chars, 0, pos);
}
+ }
- return chars == null ? input : new String(chars, 0, pos);
+ /**
+ * filterCharacters() is called on each delimited segment of the metric.
+ *
+ * <p>We might get a string that has coded structures, references to
instances of serializers and reducers, and even if
+ * we normalize all the odd characters could be overly long for a
metric name, likely to be truncated downstream.
+ * Our choices here appear to be either to discard invalid metrics, or
to pragmatically handle each of the various
+ * issues and produce something that might be useful in aggregate even
though the named parts are hard to read.
+ *
+ * <p>This function will find and remove all object references like
@abcd0123, so that task and operator names are stable.
+ * The name of an operator should be the same every time it is run, so
we should ignore object hash ids like these.
+ *
+ * <p>If the segment is a tm_id, task_id, job_id, task_attempt_id, we
can optionally trim those to the first 8 chars.
+ * This can reduce overall length substantially while still preserving
enough to distinguish metrics from each other.
+ *
+ * <p>If the segment is 50 chars or longer, we will compress it to
avoid truncation. The compression will look like the
+ * first 10 valid chars followed by a hash of the original. This
sacrifices readability for utility as a metric, so
+ * that latency metrics might survive with valid and useful dimensions
for aggregation, even if it is very hard to
+ * reverse engineer the particular operator name. Application
developers can of course still supply their own names
+ * and are not forced to rely on the defaults.
+ *
+ * <p>This will turn something like:
+ * "TriggerWindow(TumblingProcessingTimeWindows(5000),
ReducingStateDescriptor{serializer=org.apache.flink.api.java
+ * .typeutils.runtime.PojoSerializer@f3395ffa,
reduceFunction=org.apache.flink.streaming.examples.socket.
+ * SocketWindowWordCount$1@4201c465},
ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))"
+ *
+ * <p>into: "TriggerWin_c2910b88"
+ */
+ @Override
+ public String filterCharacters(String input) {
+ // remove instance references
+ Matcher hasRefs = instanceRef.matcher(input);
+ if (hasRefs.find()) {
+ input = hasRefs.replaceAll("");
+ }
+ // compress segments too long
+ if (input.length() >= 50) {
+ return filterNCharacters(input, 10) + "_" +
Integer.toHexString(input.hashCode());
+ }
+ int limit = Integer.MAX_VALUE;
+ // optionally shrink flink ids
+ if (shortIds && input.length() == 32 &&
flinkId.matcher(input).matches()) {
--- End diff --
I would like to point that this is an opt-in feature. Also, given that the
IDs can be controlled by the user (`SingleOutputStreamOperator#setUid), you can
guarantee uniqueness even with 8 characters.
> dogstatsd mode in statsd reporter
> ---------------------------------
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
> Reporter: David Brinegar
> Fix For: 1.4.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing
> to the manner in which Flink variables are handled, mainly around invalid
> characters and metrics too long. As an option, it would be quite useful to
> have a stricter dogstatsd compliant output. Dogstatsd metrics are tagged,
> should be less than 200 characters including tag names and values, be
> alphanumeric + underbar, delimited by periods. As a further pragmatic
> restriction, negative and other invalid values should be ignored rather than
> sent to the backend. These restrictions play well with a broad set of
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by
> periods. Runs of invalid characters within a metric segment would be
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic
> representation of the metric name, to preserve the unique metric time series
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id,
> task_attempt_id, to the first 8 characters, again to preserve enough
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters
> plus a hash of the long name. For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000),
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references. The stable version would be:
>
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000),
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
> ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point
> would be to preserve the metrics so they are valid, avoid truncation, and can
> be aggregated along other dimensions even if this particular dimension is
> hard to parse after the compression.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)