This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a137963015e6c79893f9854cc717af417abbb43c
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Dec 30 18:13:09 2020 +0100

    [FLINK-20718][metrics] Add busyTimeMsPerSecond metric
    
    It's defined as inverted value of idleTimeMsPerSecond
---
 docs/ops/metrics.md                                        |  5 +++++
 docs/ops/metrics.zh.md                                     |  5 +++++
 .../java/org/apache/flink/runtime/metrics/MetricNames.java |  1 +
 .../flink/runtime/metrics/groups/TaskIOMetricGroup.java    | 14 ++++++++++++++
 .../flink/streaming/runtime/tasks/SourceStreamTask.java    |  2 ++
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  2 ++
 .../streaming/runtime/tasks/SourceStreamTaskTest.java      |  4 +++-
 7 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/docs/ops/metrics.md b/docs/ops/metrics.md
index e2e9370..1947b38 100644
--- a/docs/ops/metrics.md
+++ b/docs/ops/metrics.md
@@ -1259,6 +1259,11 @@ Certain RocksDB native metrics are available but 
disabled by default, you can fi
       <td>Meter</td>
     </tr>
     <tr>
+      <td>busyTimeMsPerSecond</td>
+      <td>The time (in milliseconds) this task is busy (neither idle nor back 
pressured) per second. Can be NaN, if the value could not be calculated.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <th rowspan="6"><strong>Task/Operator</strong></th>
       <td>numRecordsIn</td>
       <td>The total number of records this operator/task has received.</td>
diff --git a/docs/ops/metrics.zh.md b/docs/ops/metrics.zh.md
index 19fcba3..5fa8ed6 100644
--- a/docs/ops/metrics.zh.md
+++ b/docs/ops/metrics.zh.md
@@ -1259,6 +1259,11 @@ Certain RocksDB native metrics are available but 
disabled by default, you can fi
       <td>Meter</td>
     </tr>
     <tr>
+      <td>busyTimeMsPerSecond</td>
+      <td>The time (in milliseconds) this task is busy (neither idle nor back 
pressured) per second. Can be NaN, if the value could not be calculated.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <th rowspan="6"><strong>Task/Operator</strong></th>
       <td>numRecordsIn</td>
       <td>The total number of records this operator/task has received.</td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 6a58de7..504f309 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -67,5 +67,6 @@ public class MetricNames {
     }
 
     public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
+    public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE;
     public static final String TASK_BACK_PRESSURED_TIME = 
"backPressuredTimeMs" + SUFFIX_RATE;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 9be2180..5ce4508 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
@@ -46,8 +47,11 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     private final Meter numRecordsOutRate;
     private final Meter numBuffersOutRate;
     private final Meter idleTimePerSecond;
+    private final Gauge busyTimePerSecond;
     private final Meter backPressuredTimePerSecond;
 
+    private volatile boolean busyTimeEnabled;
+
     public TaskIOMetricGroup(TaskMetricGroup parent) {
         super(parent);
 
@@ -71,6 +75,7 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                 meter(MetricNames.TASK_IDLE_TIME, new MeterView(new 
SimpleCounter()));
         this.backPressuredTimePerSecond =
                 meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new 
SimpleCounter()));
+        this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, 
this::getBusyTimePerSecond);
     }
 
     public IOMetrics createSnapshot() {
@@ -109,6 +114,15 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         return backPressuredTimePerSecond;
     }
 
+    public void setEnableBusyTime(boolean enabled) {
+        busyTimeEnabled = enabled;
+    }
+
+    private double getBusyTimePerSecond() {
+        double busyTime = idleTimePerSecond.getRate() + 
backPressuredTimePerSecond.getRate();
+        return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : 
Double.NaN;
+    }
+
     // 
============================================================================================
     // Metric Reuse
     // 
============================================================================================
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 2661279..2cb6902 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -80,6 +80,8 @@ public class SourceStreamTask<
                 StreamTaskActionExecutor.synchronizedExecutor(lock));
         this.lock = Preconditions.checkNotNull(lock);
         this.sourceThread = new LegacySourceFunctionThread();
+
+        
getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8d3ce70..c94c43d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -343,6 +343,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
                         new ExecutorThreadFactory("channel-state-unspilling"));
 
         injectChannelStateWriterIntoChannels();
+
+        
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
     }
 
     private void injectChannelStateWriterIntoChannels() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d7cb5a7..00716c6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -110,7 +110,7 @@ public class SourceStreamTaskTest {
     }
 
     @Test(timeout = 60_000)
-    public void testStartDelayMetric() throws Exception {
+    public void testMetrics() throws Exception {
         long sleepTime = 42;
         StreamTaskMailboxTestHarnessBuilder<String> builder =
                 new StreamTaskMailboxTestHarnessBuilder<>(
@@ -145,6 +145,8 @@ public class SourceStreamTaskTest {
                 (Gauge<Long>) 
metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
         assertThat(
                 checkpointStartDelayGauge.getValue(), 
greaterThanOrEqualTo(sleepTime * 1_000_000));
+        Gauge<Double> busyTimeGauge = (Gauge<Double>) 
metrics.get(MetricNames.TASK_BUSY_TIME);
+        assertTrue(Double.isNaN(busyTimeGauge.getValue()));
     }
 
     /**

Reply via email to