This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a137963015e6c79893f9854cc717af417abbb43c Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Dec 30 18:13:09 2020 +0100 [FLINK-20718][metrics] Add busyTimeMsPerSecond metric It's defined as inverted value of idleTimeMsPerSecond --- docs/ops/metrics.md | 5 +++++ docs/ops/metrics.zh.md | 5 +++++ .../java/org/apache/flink/runtime/metrics/MetricNames.java | 1 + .../flink/runtime/metrics/groups/TaskIOMetricGroup.java | 14 ++++++++++++++ .../flink/streaming/runtime/tasks/SourceStreamTask.java | 2 ++ .../apache/flink/streaming/runtime/tasks/StreamTask.java | 2 ++ .../streaming/runtime/tasks/SourceStreamTaskTest.java | 4 +++- 7 files changed, 32 insertions(+), 1 deletion(-) diff --git a/docs/ops/metrics.md b/docs/ops/metrics.md index e2e9370..1947b38 100644 --- a/docs/ops/metrics.md +++ b/docs/ops/metrics.md @@ -1259,6 +1259,11 @@ Certain RocksDB native metrics are available but disabled by default, you can fi <td>Meter</td> </tr> <tr> + <td>busyTimeMsPerSecond</td> + <td>The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated.</td> + <td>Meter</td> + </tr> + <tr> <th rowspan="6"><strong>Task/Operator</strong></th> <td>numRecordsIn</td> <td>The total number of records this operator/task has received.</td> diff --git a/docs/ops/metrics.zh.md b/docs/ops/metrics.zh.md index 19fcba3..5fa8ed6 100644 --- a/docs/ops/metrics.zh.md +++ b/docs/ops/metrics.zh.md @@ -1259,6 +1259,11 @@ Certain RocksDB native metrics are available but disabled by default, you can fi <td>Meter</td> </tr> <tr> + <td>busyTimeMsPerSecond</td> + <td>The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated.</td> + <td>Meter</td> + </tr> + <tr> <th rowspan="6"><strong>Task/Operator</strong></th> <td>numRecordsIn</td> <td>The total number of records this operator/task has received.</td> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 6a58de7..504f309 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -67,5 +67,6 @@ public class MetricNames { } public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE; + public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE; public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 9be2180..5ce4508 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.SimpleCounter; @@ -46,8 +47,11 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final Meter numRecordsOutRate; private final Meter numBuffersOutRate; private final Meter idleTimePerSecond; + private final Gauge busyTimePerSecond; private final Meter backPressuredTimePerSecond; + private volatile boolean busyTimeEnabled; + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); @@ -71,6 +75,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter())); this.backPressuredTimePerSecond = meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter())); + this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond); } public IOMetrics createSnapshot() { @@ -109,6 +114,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { return backPressuredTimePerSecond; } + public void setEnableBusyTime(boolean enabled) { + busyTimeEnabled = enabled; + } + + private double getBusyTimePerSecond() { + double busyTime = idleTimePerSecond.getRate() + backPressuredTimePerSecond.getRate(); + return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN; + } + // ============================================================================================ // Metric Reuse // ============================================================================================ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 2661279..2cb6902 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -80,6 +80,8 @@ public class SourceStreamTask< StreamTaskActionExecutor.synchronizedExecutor(lock)); this.lock = Preconditions.checkNotNull(lock); this.sourceThread = new LegacySourceFunctionThread(); + + getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8d3ce70..c94c43d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -343,6 +343,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab new ExecutorThreadFactory("channel-state-unspilling")); injectChannelStateWriterIntoChannels(); + + environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true); } private void injectChannelStateWriterIntoChannels() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index d7cb5a7..00716c6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -110,7 +110,7 @@ public class SourceStreamTaskTest { } @Test(timeout = 60_000) - public void testStartDelayMetric() throws Exception { + public void testMetrics() throws Exception { long sleepTime = 42; StreamTaskMailboxTestHarnessBuilder<String> builder = new StreamTaskMailboxTestHarnessBuilder<>( @@ -145,6 +145,8 @@ public class SourceStreamTaskTest { (Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME); assertThat( checkpointStartDelayGauge.getValue(), greaterThanOrEqualTo(sleepTime * 1_000_000)); + Gauge<Double> busyTimeGauge = (Gauge<Double>) metrics.get(MetricNames.TASK_BUSY_TIME); + assertTrue(Double.isNaN(busyTimeGauge.getValue())); } /**
