Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4188#discussion_r124255898
--- Diff:
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
---
@@ -179,41 +254,130 @@ private String prefix(String ... names) {
}
}
- private void send(final String name, final String value) {
+ private String buildStatsdLine(final String name, final String value,
final String tags) {
+ Double number;
try {
- String formatted = String.format("%s:%s|g", name,
value);
- byte[] data =
formatted.getBytes(StandardCharsets.UTF_8);
- socket.send(new DatagramPacket(data, data.length,
this.address));
+ number = Double.parseDouble(value);
+ }
+ catch (NumberFormatException e) {
+ // quietly skip values like "n/a"
+ return "";
}
- catch (IOException e) {
- LOG.error("unable to send packet to statsd at '{}:{}'",
address.getHostName(), address.getPort());
+ if (number >= 0.) {
+ return String.format("%s:%s|g%s", name, value, tags !=
null ? tags : "");
+ } else {
+ // quietly skip "unknowns" like
lowWaterMark:-9223372036854775808, or JVM.Memory.NonHeap.Max:-1, or NaN
+ return "";
}
}
- @Override
- public String filterCharacters(String input) {
+ private void send(final String name, final String value, final String
tags) {
+ String formatted = buildStatsdLine(name, value, tags);
+ if (formatted.length() > 0) {
+ try {
+ byte[] data =
formatted.getBytes(StandardCharsets.UTF_8);
+ socket.send(new DatagramPacket(data,
data.length, this.address));
+ }
+ catch (IOException e) {
+ LOG.error("unable to send packet to statsd at
'{}:{}'", address.getHostName(), address.getPort());
+ }
+ }
+ }
+
+ /**
+ * dogstatsd names should: start with letter, uses ascii alphanumerics
and underscore, separated by periods.
+ * Collapse runs of invalid characters into an underscore. Discard
invalid prefix and suffix.
+ * Eg: ":::metric:::name:::" -> "metric_name"
+ */
+
+ private boolean isValidStatsdChar(char c) {
+ return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c
>= '0' && c <= '9') || (c == '_');
+ }
+
+ private String filterNCharacters(String input, int limit) {
char[] chars = null;
final int strLen = input.length();
int pos = 0;
+ boolean insertFiller = false;
- for (int i = 0; i < strLen; i++) {
+ for (int i = 0; i < strLen && pos < limit; i++) {
final char c = input.charAt(i);
- switch (c) {
- case ':':
- if (chars == null) {
- chars = input.toCharArray();
+ if (isValidStatsdChar(c)) {
+ if (chars != null) {
+ // skip invalid suffix, only fill if
followed by valid character
+ if (insertFiller) {
+ chars[pos++] = '_';
+ insertFiller = false;
}
- chars[pos++] = '-';
- break;
+ chars[pos] = c;
+ }
+ pos++;
+ } else {
+ if (chars == null) {
+ chars = input.toCharArray();
+ }
+ // skip invalid prefix, until pos > 0
+ if (pos > 0) {
+ // collapse sequence of invalid char
into one filler
+ insertFiller = true;
+ }
+ }
+ }
- default:
- if (chars != null) {
- chars[pos] = c;
- }
- pos++;
+ if (chars == null) {
+ if (strLen > limit) {
+ return input.substring(0, limit);
+ } else {
+ return input; // happy path, input is entirely
valid and under the limit
}
+ } else {
+ return new String(chars, 0, pos);
}
+ }
- return chars == null ? input : new String(chars, 0, pos);
+ /**
+ * filterCharacters() is called on each delimited segment of the metric.
+ *
+ * <p>We might get a string that has coded structures, references to
instances of serializers and reducers, and even if
+ * we normalize all the odd characters could be overly long for a
metric name, likely to be truncated downstream.
+ * Our choices here appear to be either to discard invalid metrics, or
to pragmatically handle each of the various
+ * issues and produce something that might be useful in aggregate even
though the named parts are hard to read.
+ *
+ * <p>This function will find and remove all object references like
@abcd0123, so that task and operator names are stable.
+ * The name of an operator should be the same every time it is run, so
we should ignore object hash ids like these.
+ *
+ * <p>If the segment is a tm_id, task_id, job_id, task_attempt_id, we
can optionally trim those to the first 8 chars.
+ * This can reduce overall length substantially while still preserving
enough to distinguish metrics from each other.
+ *
+ * <p>If the segment is 50 chars or longer, we will compress it to
avoid truncation. The compression will look like the
+ * first 10 valid chars followed by a hash of the original. This
sacrifices readability for utility as a metric, so
+ * that latency metrics might survive with valid and useful dimensions
for aggregation, even if it is very hard to
+ * reverse engineer the particular operator name. Application
developers can of course still supply their own names
+ * and are not forced to rely on the defaults.
+ *
+ * <p>This will turn something like:
+ * "TriggerWindow(TumblingProcessingTimeWindows(5000),
ReducingStateDescriptor{serializer=org.apache.flink.api.java
+ * .typeutils.runtime.PojoSerializer@f3395ffa,
reduceFunction=org.apache.flink.streaming.examples.socket.
+ * SocketWindowWordCount$1@4201c465},
ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))"
+ *
+ * <p>into: "TriggerWin_c2910b88"
+ */
+ @Override
+ public String filterCharacters(String input) {
+ // remove instance references
+ Matcher hasRefs = instanceRef.matcher(input);
+ if (hasRefs.find()) {
+ input = hasRefs.replaceAll("");
+ }
+ // compress segments too long
+ if (input.length() >= 50) {
+ return filterNCharacters(input, 10) + "_" +
Integer.toHexString(input.hashCode());
+ }
+ int limit = Integer.MAX_VALUE;
+ // optionally shrink flink ids
+ if (shortIds && input.length() == 32 &&
flinkId.matcher(input).matches()) {
--- End diff --
I would like to point that this is an opt-in feature. Also, given that the
IDs can be controlled by the user (`SingleOutputStreamOperator#setUid), you can
guarantee uniqueness even with 8 characters.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---