Paymahn created FLINK-32950:
-------------------------------
Summary: statsd reporter does not follow spec for counters
Key: FLINK-32950
URL: https://issues.apache.org/jira/browse/FLINK-32950
Project: Flink
Issue Type: Bug
Components: Runtime / Metrics
Reporter: Paymahn
The [statsd|https://github.com/statsd/statsd/blob/master/docs/metric_types.md]
spec says the following:
> At each flush the current count is sent and reset to 0.
The flink [statsd
reporter|https://github.com/apache/flink/blob/e5f78352a29df0d4dfe0c34369193896e7a1b4be/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L129-L131]
does not reset the counter to 0 after each flush. Instead it reports
cumulative values. This is not correct and causes issues with downstream
clients which consume these statsd metrics.
One possible fix would be do add the following as a class variable
{code:java}
protected final Map<Counter, Long> lastKnownCounterValues = new
ConcurrentHashMap<>();{code}
and then modify the {{reportCounter}} function like so
{code:java}
private void reportCounter(final DMetric metric, final Counter counter) {
// the statsd protocol says that counters should be set to 0 after flushing
//
https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting
// we don't want to actually change the value of the counter because it
could have uninteded
// consequences. Instead, we keep track of the last known value and report
the delta
long curCount = counter.getCount();
long lastKnownCount = this.lastKnownCounterValues.getOrDefault(counter, 0L);
send(metric.getName(), curCount - lastKnownCount, DMetricType.COUNTER,
metric.getTags());
this.lastKnownCounterValues.put(counter, curCount);
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)