Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4647#discussion_r138133547
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
---
@@ -72,13 +72,54 @@ public void addIOMetrics(AccessExecution attempt,
@Nullable MetricFetcher fetche
} else { // execAttempt is still running, use
MetricQueryService instead
if (fetcher != null) {
fetcher.update();
- MetricStore.SubtaskMetricStore metrics =
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID,
attempt.getParallelSubtaskIndex());
- if (metrics != null) {
- this.numBytesInLocal +=
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
- this.numBytesInRemote +=
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
- this.numBytesOut +=
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
- this.numRecordsIn +=
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
- this.numRecordsOut +=
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+ MetricStore metricStore =
fetcher.getMetricStore();
+ synchronized (metricStore) {
+ MetricStore.SubtaskMetricStore metrics
= metricStore.getSubtaskMetricStore(jobID, taskID,
attempt.getParallelSubtaskIndex());
+ if (metrics != null) {
+ /**
+ * We want to keep track of
missing metrics to be able to make a difference between 0 as a value
+ * and a missing value.
+ * In case a metric is missing
for a parallel instance of a task, we initialize if with -1 and
+ * will be considered as
incomplete
--- End diff --
missing period, and the sentence flow is a bit weird, how about "and
consider it as incomplete."?
---