This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7aafc4c51b0314efbfabb94c5efea330914a60c7 Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Dec 31 08:39:04 2020 +0100 [FLINK-19174][metrics] Fix idle and backpressured time accuracy on long sleeps In particularly, if task is idling forever, as there are no new records incomming previous version would report idleTime as 0% and busyTime as 100% which is incorrect. In this version, idleTime metric is aware that idling period has started and can take that into account when updating it's value. --- .../flink/metrics/slf4j/Slf4jReporterTest.java | 4 +- .../partition/BufferWritingResultPartition.java | 13 ++-- .../apache/flink/runtime/metrics/TimerGauge.java | 82 +++++++++++++++++++++ .../runtime/metrics/groups/TaskIOMetricGroup.java | 16 ++-- .../io/network/partition/ResultPartitionTest.java | 3 +- .../flink/runtime/metrics/TimerGaugeTest.java | 86 ++++++++++++++++++++++ .../metrics/groups/TaskIOMetricGroupTest.java | 15 +++- .../flink/streaming/runtime/tasks/StreamTask.java | 16 ++-- 8 files changed, 205 insertions(+), 30 deletions(-) diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java index 98ff8d5..e526b96 100644 --- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java +++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java @@ -48,6 +48,7 @@ import java.util.Collections; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -125,8 +126,9 @@ public class Slf4jReporterTest extends TestLogger { public void testAddGauge() throws Exception { String gaugeName = "gauge"; + int gaugesBefore = reporter.getGauges().size(); taskMetricGroup.gauge(gaugeName, null); - assertTrue(reporter.getGauges().isEmpty()); + assertThat(reporter.getGauges().size(), is(gaugesBefore)); Gauge<Long> gauge = () -> null; taskMetricGroup.gauge(gaugeName, gauge); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index c5e1138..bc3278d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -19,15 +19,13 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.metrics.TimerGauge; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; @@ -64,7 +62,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { /** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */ private BufferBuilder broadcastBufferBuilder; - private Meter backPressuredTimeMsPerSecond = new MeterView(new SimpleCounter()); + private TimerGauge backPressuredTimeMsPerSecond = new TimerGauge(); public BufferWritingResultPartition( String owningTaskName, @@ -335,11 +333,10 @@ public abstract class BufferWritingResultPartition extends ResultPartition { return bufferBuilder; } + backPressuredTimeMsPerSecond.markStart(); try { - long start = System.currentTimeMillis(); bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition); - long backPressuredTime = System.currentTimeMillis() - start; - backPressuredTimeMsPerSecond.markEvent(backPressuredTime); + backPressuredTimeMsPerSecond.markEnd(); return bufferBuilder; } catch (InterruptedException e) { throw new IOException("Interrupted while waiting for buffer"); @@ -378,7 +375,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { } @VisibleForTesting - public Meter getBackPressuredTimeMsPerSecond() { + public TimerGauge getBackPressuredTimeMsPerSecond() { return backPressuredTimeMsPerSecond; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java new file mode 100644 index 0000000..67394c0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.View; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; + +/** + * {@link TimerGauge} measures how much time is spent in a given state, with entry into that state + * being signaled by {@link #markStart()}. Measuring is stopped by {@link #markEnd()}. This class in + * particularly takes care of the case, when {@link #update()} is called when some measurement + * started but has not yet finished. For example even if next {@link #markEnd()} call is expected to + * happen in a couple of hours, the returned value will account for this ongoing measurement. + */ +public class TimerGauge implements Gauge<Long>, View { + private final Clock clock; + + private long previousCount; + private long currentCount; + private long currentMeasurementStart; + + public TimerGauge() { + this(SystemClock.getInstance()); + } + + public TimerGauge(Clock clock) { + this.clock = clock; + } + + public synchronized void markStart() { + if (currentMeasurementStart == 0) { + currentMeasurementStart = clock.absoluteTimeMillis(); + } + } + + public synchronized void markEnd() { + if (currentMeasurementStart != 0) { + currentCount += clock.absoluteTimeMillis() - currentMeasurementStart; + currentMeasurementStart = 0; + } + } + + @Override + public synchronized void update() { + if (currentMeasurementStart != 0) { + long now = clock.absoluteTimeMillis(); + currentCount += now - currentMeasurementStart; + currentMeasurementStart = now; + } + previousCount = Math.max(Math.min(currentCount / UPDATE_INTERVAL_SECONDS, 1000), 0); + currentCount = 0; + } + + @Override + public synchronized Long getValue() { + return previousCount; + } + + @VisibleForTesting + public synchronized long getCount() { + return currentCount; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 5ce4508..f0a0e10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -25,6 +25,7 @@ import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.TimerGauge; import java.util.ArrayList; import java.util.List; @@ -46,9 +47,9 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final Meter numRecordsInRate; private final Meter numRecordsOutRate; private final Meter numBuffersOutRate; - private final Meter idleTimePerSecond; + private final TimerGauge idleTimePerSecond; private final Gauge busyTimePerSecond; - private final Meter backPressuredTimePerSecond; + private final TimerGauge backPressuredTimePerSecond; private volatile boolean busyTimeEnabled; @@ -71,10 +72,9 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { this.numBuffersOutRate = meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut)); - this.idleTimePerSecond = - meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter())); + this.idleTimePerSecond = gauge(MetricNames.TASK_IDLE_TIME, new TimerGauge()); this.backPressuredTimePerSecond = - meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter())); + gauge(MetricNames.TASK_BACK_PRESSURED_TIME, new TimerGauge()); this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond); } @@ -106,11 +106,11 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { return numBuffersOut; } - public Meter getIdleTimeMsPerSecond() { + public TimerGauge getIdleTimeMsPerSecond() { return idleTimePerSecond; } - public Meter getBackPressuredTimePerSecond() { + public TimerGauge getBackPressuredTimePerSecond() { return backPressuredTimePerSecond; } @@ -119,7 +119,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { } private double getBusyTimePerSecond() { - double busyTime = idleTimePerSecond.getRate() + backPressuredTimePerSecond.getRate(); + double busyTime = idleTimePerSecond.getValue() + backPressuredTimePerSecond.getValue(); return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 059c815..09bd4c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -50,6 +50,7 @@ import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.c import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -510,7 +511,7 @@ public class ResultPartitionTest { assertNotNull(buffer); // back-pressured time is zero when there is buffer available. - assertEquals(0, resultPartition.getBackPressuredTimeMsPerSecond().getCount()); + assertThat(resultPartition.getBackPressuredTimeMsPerSecond().getValue(), equalTo(0L)); CountDownLatch syncLock = new CountDownLatch(1); final Thread requestThread = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java new file mode 100644 index 0000000..4d96e0f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ---------------------------------------------------------------------------- +// This class is largely adapted from "com.google.common.base.Preconditions", +// which is part of the "Guava" library. +// +// Because of frequent issues with dependency conflicts, this class was +// added to the Flink code base to reduce dependency on Guava. +// ---------------------------------------------------------------------------- + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.metrics.View; +import org.apache.flink.util.clock.ManualClock; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for {@link TimerGauge}. */ +public class TimerGaugeTest { + private static final long SLEEP = 10; + + @Test + public void testBasicUsage() { + ManualClock clock = new ManualClock(42_000_000); + TimerGauge gauge = new TimerGauge(clock); + + gauge.update(); + assertThat(gauge.getValue(), is(0L)); + + gauge.markStart(); + clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS); + gauge.markEnd(); + gauge.update(); + + assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS)); + } + + @Test + public void testUpdateWithoutMarkingEnd() { + ManualClock clock = new ManualClock(42_000_000); + TimerGauge gauge = new TimerGauge(clock); + + gauge.markStart(); + clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS); + gauge.update(); + + assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS)); + } + + @Test + public void testGetWithoutUpdate() { + ManualClock clock = new ManualClock(42_000_000); + TimerGauge gauge = new TimerGauge(clock); + + gauge.markStart(); + clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS); + + assertThat(gauge.getValue(), is(0L)); + + gauge.markEnd(); + + assertThat(gauge.getValue(), is(0L)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index 7892a68..5ae20c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -24,13 +24,15 @@ import org.apache.flink.runtime.executiongraph.IOMetrics; import org.junit.Test; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; /** Tests for the {@link TaskIOMetricGroup}. */ public class TaskIOMetricGroupTest { @Test - public void testTaskIOMetricGroup() { + public void testTaskIOMetricGroup() throws InterruptedException { TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); TaskIOMetricGroup taskIO = task.getIOMetricGroup(); @@ -52,7 +54,12 @@ public class TaskIOMetricGroupTest { taskIO.getNumBytesInCounter().inc(100L); taskIO.getNumBytesOutCounter().inc(250L); taskIO.getNumBuffersOutCounter().inc(3L); - taskIO.getIdleTimeMsPerSecond().markEvent(2L); + taskIO.getIdleTimeMsPerSecond().markStart(); + taskIO.getBackPressuredTimePerSecond().markStart(); + long sleepTime = 2L; + Thread.sleep(sleepTime); + taskIO.getIdleTimeMsPerSecond().markEnd(); + taskIO.getBackPressuredTimePerSecond().markEnd(); IOMetrics io = taskIO.createSnapshot(); assertEquals(32L, io.getNumRecordsIn()); @@ -60,6 +67,8 @@ public class TaskIOMetricGroupTest { assertEquals(100L, io.getNumBytesIn()); assertEquals(250L, io.getNumBytesOut()); assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount()); - assertEquals(2L, taskIO.getIdleTimeMsPerSecond().getCount()); + assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(sleepTime)); + assertThat( + taskIO.getBackPressuredTimePerSecond().getCount(), greaterThanOrEqualTo(sleepTime)); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index c94c43d..86f13da 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.InputStatus; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; @@ -47,6 +46,7 @@ import org.apache.flink.runtime.io.network.partition.ChannelStateHolder; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.TimerGauge; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -406,8 +406,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab } TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup(); - final long startTime = System.currentTimeMillis(); - Meter timer; + TimerGauge timer; CompletableFuture<?> resumeFuture; if (!recordWriter.isAvailable()) { timer = ioMetrics.getBackPressuredTimePerSecond(); @@ -417,7 +416,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab resumeFuture = inputProcessor.getAvailableFuture(); } assertNoException( - resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(), timer, startTime))); + resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(), timer))); } private void resetSynchronousSavepointId() { @@ -1323,18 +1322,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab private static class ResumeWrapper implements Runnable { private final Suspension suspendedDefaultAction; - private final Meter timer; - private final long startTime; + private final TimerGauge timer; - public ResumeWrapper(Suspension suspendedDefaultAction, Meter timer, long startTime) { + public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge timer) { this.suspendedDefaultAction = suspendedDefaultAction; + timer.markStart(); this.timer = timer; - this.startTime = startTime; } @Override public void run() { - timer.markEvent(System.currentTimeMillis() - startTime); + timer.markEnd(); suspendedDefaultAction.resume(); } }
