Github user zentol commented on the issue:
https://github.com/apache/flink/pull/4647
Tried it out and it works as expected.
After looking at the aggregation code in detail I suggest to set an
isComplete boolean for each counter instead of setting the value to -1. This
makes things more explicit, and preserves the existing behavior of happily
aggregating the counters. It also simplifies the addition of
bytesInLocal/-Remote.
I.e., the aggregation part looks like this:
```
String numBytesInRemoteString =
metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE);
if (numBytesInRemoteString == null) {
this.numBytesInRemoteComplete = false;
} else {
this.numBytesInRemote += Long.valueOf(numBytesInRemoteString);
}
```
and the writing like this:
```
public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
gen.writeObjectFieldStart("metrics");
long numBytesIn = this.numBytesInLocal + this.numBytesInRemote;
writeIOMetricWithCompleteness(gen, "read-bytes", numBytesIn,
this.numBytesInLocalComplete && this.numBytesInRemoteComplete);
writeIOMetricWithCompleteness(gen, "write-bytes", this.numBytesOut,
this.numBytesOutComplete);
writeIOMetricWithCompleteness(gen, "read-records", this.numRecordsIn,
this.numRecordsInComplete);
writeIOMetricWithCompleteness(gen, "write-records", this.numRecordsOut,
this.numRecordsOutComplete);
gen.writeEndObject();
}
private void writeIOMetricWithCompleteness(JsonGenerator gen, String
fieldName, long fieldValue, boolean isComplete) throws IOException{
gen.writeNumberField(fieldName, fieldValue);
gen.writeBooleanField(fieldName + "-complete", isComplete);
}
```
What do you think @jameslafa ?
---