This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7aafc4c51b0314efbfabb94c5efea330914a60c7
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Dec 31 08:39:04 2020 +0100

    [FLINK-19174][metrics] Fix idle and backpressured time accuracy on long 
sleeps
    
    In particularly, if task is idling forever, as there are no new records 
incomming
    previous version would report idleTime as 0% and busyTime as 100% which is 
incorrect.
    
    In this version, idleTime metric is aware that idling period has started 
and can take
    that into account when updating it's value.
---
 .../flink/metrics/slf4j/Slf4jReporterTest.java     |  4 +-
 .../partition/BufferWritingResultPartition.java    | 13 ++--
 .../apache/flink/runtime/metrics/TimerGauge.java   | 82 +++++++++++++++++++++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 16 ++--
 .../io/network/partition/ResultPartitionTest.java  |  3 +-
 .../flink/runtime/metrics/TimerGaugeTest.java      | 86 ++++++++++++++++++++++
 .../metrics/groups/TaskIOMetricGroupTest.java      | 15 +++-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 16 ++--
 8 files changed, 205 insertions(+), 30 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index 98ff8d5..e526b96 100644
--- 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -48,6 +48,7 @@ import java.util.Collections;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -125,8 +126,9 @@ public class Slf4jReporterTest extends TestLogger {
     public void testAddGauge() throws Exception {
         String gaugeName = "gauge";
 
+        int gaugesBefore = reporter.getGauges().size();
         taskMetricGroup.gauge(gaugeName, null);
-        assertTrue(reporter.getGauges().isEmpty());
+        assertThat(reporter.getGauges().size(), is(gaugesBefore));
 
         Gauge<Long> gauge = () -> null;
         taskMetricGroup.gauge(gaugeName, gauge);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index c5e1138..bc3278d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -19,15 +19,13 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.metrics.TimerGauge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -64,7 +62,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
     /** For broadcast mode, a single BufferBuilder is shared by all 
subpartitions. */
     private BufferBuilder broadcastBufferBuilder;
 
-    private Meter backPressuredTimeMsPerSecond = new MeterView(new 
SimpleCounter());
+    private TimerGauge backPressuredTimeMsPerSecond = new TimerGauge();
 
     public BufferWritingResultPartition(
             String owningTaskName,
@@ -335,11 +333,10 @@ public abstract class BufferWritingResultPartition 
extends ResultPartition {
             return bufferBuilder;
         }
 
+        backPressuredTimeMsPerSecond.markStart();
         try {
-            long start = System.currentTimeMillis();
             bufferBuilder = 
bufferPool.requestBufferBuilderBlocking(targetSubpartition);
-            long backPressuredTime = System.currentTimeMillis() - start;
-            backPressuredTimeMsPerSecond.markEvent(backPressuredTime);
+            backPressuredTimeMsPerSecond.markEnd();
             return bufferBuilder;
         } catch (InterruptedException e) {
             throw new IOException("Interrupted while waiting for buffer");
@@ -378,7 +375,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
     }
 
     @VisibleForTesting
-    public Meter getBackPressuredTimeMsPerSecond() {
+    public TimerGauge getBackPressuredTimeMsPerSecond() {
         return backPressuredTimeMsPerSecond;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
new file mode 100644
index 0000000..67394c0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+/**
+ * {@link TimerGauge} measures how much time is spent in a given state, with 
entry into that state
+ * being signaled by {@link #markStart()}. Measuring is stopped by {@link 
#markEnd()}. This class in
+ * particularly takes care of the case, when {@link #update()} is called when 
some measurement
+ * started but has not yet finished. For example even if next {@link 
#markEnd()} call is expected to
+ * happen in a couple of hours, the returned value will account for this 
ongoing measurement.
+ */
+public class TimerGauge implements Gauge<Long>, View {
+    private final Clock clock;
+
+    private long previousCount;
+    private long currentCount;
+    private long currentMeasurementStart;
+
+    public TimerGauge() {
+        this(SystemClock.getInstance());
+    }
+
+    public TimerGauge(Clock clock) {
+        this.clock = clock;
+    }
+
+    public synchronized void markStart() {
+        if (currentMeasurementStart == 0) {
+            currentMeasurementStart = clock.absoluteTimeMillis();
+        }
+    }
+
+    public synchronized void markEnd() {
+        if (currentMeasurementStart != 0) {
+            currentCount += clock.absoluteTimeMillis() - 
currentMeasurementStart;
+            currentMeasurementStart = 0;
+        }
+    }
+
+    @Override
+    public synchronized void update() {
+        if (currentMeasurementStart != 0) {
+            long now = clock.absoluteTimeMillis();
+            currentCount += now - currentMeasurementStart;
+            currentMeasurementStart = now;
+        }
+        previousCount = Math.max(Math.min(currentCount / 
UPDATE_INTERVAL_SECONDS, 1000), 0);
+        currentCount = 0;
+    }
+
+    @Override
+    public synchronized Long getValue() {
+        return previousCount;
+    }
+
+    @VisibleForTesting
+    public synchronized long getCount() {
+        return currentCount;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 5ce4508..f0a0e10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.TimerGauge;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -46,9 +47,9 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     private final Meter numRecordsInRate;
     private final Meter numRecordsOutRate;
     private final Meter numBuffersOutRate;
-    private final Meter idleTimePerSecond;
+    private final TimerGauge idleTimePerSecond;
     private final Gauge busyTimePerSecond;
-    private final Meter backPressuredTimePerSecond;
+    private final TimerGauge backPressuredTimePerSecond;
 
     private volatile boolean busyTimeEnabled;
 
@@ -71,10 +72,9 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         this.numBuffersOutRate =
                 meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new 
MeterView(numBuffersOut));
 
-        this.idleTimePerSecond =
-                meter(MetricNames.TASK_IDLE_TIME, new MeterView(new 
SimpleCounter()));
+        this.idleTimePerSecond = gauge(MetricNames.TASK_IDLE_TIME, new 
TimerGauge());
         this.backPressuredTimePerSecond =
-                meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new 
SimpleCounter()));
+                gauge(MetricNames.TASK_BACK_PRESSURED_TIME, new TimerGauge());
         this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, 
this::getBusyTimePerSecond);
     }
 
@@ -106,11 +106,11 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         return numBuffersOut;
     }
 
-    public Meter getIdleTimeMsPerSecond() {
+    public TimerGauge getIdleTimeMsPerSecond() {
         return idleTimePerSecond;
     }
 
-    public Meter getBackPressuredTimePerSecond() {
+    public TimerGauge getBackPressuredTimePerSecond() {
         return backPressuredTimePerSecond;
     }
 
@@ -119,7 +119,7 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     }
 
     private double getBusyTimePerSecond() {
-        double busyTime = idleTimePerSecond.getRate() + 
backPressuredTimePerSecond.getRate();
+        double busyTime = idleTimePerSecond.getValue() + 
backPressuredTimePerSecond.getValue();
         return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : 
Double.NaN;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 059c815..09bd4c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -50,6 +50,7 @@ import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.c
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -510,7 +511,7 @@ public class ResultPartitionTest {
         assertNotNull(buffer);
 
         // back-pressured time is zero when there is buffer available.
-        assertEquals(0, 
resultPartition.getBackPressuredTimeMsPerSecond().getCount());
+        
assertThat(resultPartition.getBackPressuredTimeMsPerSecond().getValue(), 
equalTo(0L));
 
         CountDownLatch syncLock = new CountDownLatch(1);
         final Thread requestThread =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
new file mode 100644
index 0000000..4d96e0f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ----------------------------------------------------------------------------
+//  This class is largely adapted from "com.google.common.base.Preconditions",
+//  which is part of the "Guava" library.
+//
+//  Because of frequent issues with dependency conflicts, this class was
+//  added to the Flink code base to reduce dependency on Guava.
+// ----------------------------------------------------------------------------
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for {@link TimerGauge}. */
+public class TimerGaugeTest {
+    private static final long SLEEP = 10;
+
+    @Test
+    public void testBasicUsage() {
+        ManualClock clock = new ManualClock(42_000_000);
+        TimerGauge gauge = new TimerGauge(clock);
+
+        gauge.update();
+        assertThat(gauge.getValue(), is(0L));
+
+        gauge.markStart();
+        clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
+        gauge.markEnd();
+        gauge.update();
+
+        assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / 
View.UPDATE_INTERVAL_SECONDS));
+    }
+
+    @Test
+    public void testUpdateWithoutMarkingEnd() {
+        ManualClock clock = new ManualClock(42_000_000);
+        TimerGauge gauge = new TimerGauge(clock);
+
+        gauge.markStart();
+        clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
+        gauge.update();
+
+        assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / 
View.UPDATE_INTERVAL_SECONDS));
+    }
+
+    @Test
+    public void testGetWithoutUpdate() {
+        ManualClock clock = new ManualClock(42_000_000);
+        TimerGauge gauge = new TimerGauge(clock);
+
+        gauge.markStart();
+        clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
+
+        assertThat(gauge.getValue(), is(0L));
+
+        gauge.markEnd();
+
+        assertThat(gauge.getValue(), is(0L));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 7892a68..5ae20c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -24,13 +24,15 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 
 import org.junit.Test;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 
 /** Tests for the {@link TaskIOMetricGroup}. */
 public class TaskIOMetricGroupTest {
     @Test
-    public void testTaskIOMetricGroup() {
+    public void testTaskIOMetricGroup() throws InterruptedException {
         TaskMetricGroup task = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
         TaskIOMetricGroup taskIO = task.getIOMetricGroup();
 
@@ -52,7 +54,12 @@ public class TaskIOMetricGroupTest {
         taskIO.getNumBytesInCounter().inc(100L);
         taskIO.getNumBytesOutCounter().inc(250L);
         taskIO.getNumBuffersOutCounter().inc(3L);
-        taskIO.getIdleTimeMsPerSecond().markEvent(2L);
+        taskIO.getIdleTimeMsPerSecond().markStart();
+        taskIO.getBackPressuredTimePerSecond().markStart();
+        long sleepTime = 2L;
+        Thread.sleep(sleepTime);
+        taskIO.getIdleTimeMsPerSecond().markEnd();
+        taskIO.getBackPressuredTimePerSecond().markEnd();
 
         IOMetrics io = taskIO.createSnapshot();
         assertEquals(32L, io.getNumRecordsIn());
@@ -60,6 +67,8 @@ public class TaskIOMetricGroupTest {
         assertEquals(100L, io.getNumBytesIn());
         assertEquals(250L, io.getNumBytesOut());
         assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
-        assertEquals(2L, taskIO.getIdleTimeMsPerSecond().getCount());
+        assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), 
greaterThanOrEqualTo(sleepTime));
+        assertThat(
+                taskIO.getBackPressuredTimePerSecond().getCount(), 
greaterThanOrEqualTo(sleepTime));
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c94c43d..86f13da 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
@@ -47,6 +46,7 @@ import 
org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.TimerGauge;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -406,8 +406,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         }
 
         TaskIOMetricGroup ioMetrics = 
getEnvironment().getMetricGroup().getIOMetricGroup();
-        final long startTime = System.currentTimeMillis();
-        Meter timer;
+        TimerGauge timer;
         CompletableFuture<?> resumeFuture;
         if (!recordWriter.isAvailable()) {
             timer = ioMetrics.getBackPressuredTimePerSecond();
@@ -417,7 +416,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
             resumeFuture = inputProcessor.getAvailableFuture();
         }
         assertNoException(
-                resumeFuture.thenRun(new 
ResumeWrapper(controller.suspendDefaultAction(), timer, startTime)));
+                resumeFuture.thenRun(new 
ResumeWrapper(controller.suspendDefaultAction(), timer)));
     }
 
     private void resetSynchronousSavepointId() {
@@ -1323,18 +1322,17 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
 
     private static class ResumeWrapper implements Runnable {
         private final Suspension suspendedDefaultAction;
-        private final Meter timer;
-        private final long startTime;
+        private final TimerGauge timer;
 
-        public ResumeWrapper(Suspension suspendedDefaultAction, Meter timer, 
long startTime) {
+        public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge 
timer) {
             this.suspendedDefaultAction = suspendedDefaultAction;
+            timer.markStart();
             this.timer = timer;
-            this.startTime = startTime;
         }
 
         @Override
         public void run() {
-            timer.markEvent(System.currentTimeMillis() - startTime);
+            timer.markEnd();
             suspendedDefaultAction.resume();
         }
     }

Reply via email to