This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0309f13e8af [FLINK-33681][Runtime/Metrics] Reuse input/output metrics
of SourceOperator/SinkWriterOperator for task (#23998)
0309f13e8af is described below
commit 0309f13e8af62f9b523e227a3a66ff59e838a1b4
Author: Zhanghao Chen <[email protected]>
AuthorDate: Thu Aug 22 06:31:44 2024 +0800
[FLINK-33681][Runtime/Metrics] Reuse input/output metrics of
SourceOperator/SinkWriterOperator for task (#23998)
---
.../base/source/reader/SourceMetricsITCase.java | 21 +++++++++++++++++++++
.../groups/InternalOperatorIOMetricGroup.java | 12 ++++++++++++
.../runtime/metrics/groups/TaskIOMetricGroup.java | 17 +++++++++++++----
.../flink/runtime/testutils/InMemoryReporter.java | 22 +++++++++++++++++++++-
.../streaming/api/operators/SourceOperator.java | 5 +++++
.../runtime/operators/sink/SinkWriterOperator.java | 15 +++++++++++++++
.../test/streaming/runtime/SinkMetricsITCase.java | 21 +++++++++++++++++++++
.../streaming/runtime/SinkV2MetricsITCase.java | 20 ++++++++++++++++++++
8 files changed, 128 insertions(+), 5 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index b7631ef0311..3c315f694b9 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -223,6 +224,26 @@ public class SourceMetricsITCase extends TestLogger {
assertThatGauge(metrics.get(MetricNames.SOURCE_IDLE_TIME)).isEqualTo(0L);
}
assertThat(subtaskWithMetrics).isEqualTo(numSplits);
+
+ // Test operator I/O metrics are reused by task metrics
+ List<TaskMetricGroup> taskMetricGroups =
+ reporter.findTaskMetricGroups(jobId, "MetricTestingSource");
+ assertThat(taskMetricGroups).hasSize(parallelism);
+
+ int subtaskWithTaskMetrics = 0;
+ for (TaskMetricGroup taskMetricGroup : taskMetricGroups) {
+ // there are only 2 splits assigned; so two groups will not update
metrics
+ if
(taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter().getCount() == 0) {
+ continue;
+ }
+
+ subtaskWithTaskMetrics++;
+
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter())
+ .isEqualTo(processedRecordsPerSubtask);
+
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesInCounter())
+ .isEqualTo(processedRecordsPerSubtask *
MockRecordEmitter.RECORD_SIZE_IN_BYTES);
+ }
+ assertThat(subtaskWithTaskMetrics).isEqualTo(numSplits);
}
private static class LaggingTimestampAssigner
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
index 31cf560ce78..0405b2d6e07 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java
@@ -97,4 +97,16 @@ public class InternalOperatorIOMetricGroup extends
ProxyMetricGroup<InternalOper
TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
taskIO.reuseRecordsOutputCounter(this.numRecordsOut);
}
+
+ /** Causes the containing task to use this operators input bytes counter.
*/
+ public void reuseBytesInputMetricsForTask() {
+ TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
+ taskIO.reuseBytesInputCounter(this.numBytesIn);
+ }
+
+ /** Causes the containing task to use this operators output bytes counter.
*/
+ public void reuseBytesOutputMetricsForTask() {
+ TaskIOMetricGroup taskIO = parentMetricGroup.getTaskIOMetricGroup();
+ taskIO.reuseBytesOutputCounter(this.numBytesOut);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 12ba23cea21..5a034f6b4e4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -50,8 +50,8 @@ public class TaskIOMetricGroup extends
ProxyMetricGroup<TaskMetricGroup> {
private final Clock clock;
- private final Counter numBytesIn;
- private final Counter numBytesOut;
+ private final SumCounter numBytesIn;
+ private final SumCounter numBytesOut;
private final SumCounter numRecordsIn;
private final SumCounter numRecordsOut;
private final Counter numBuffersOut;
@@ -95,8 +95,8 @@ public class TaskIOMetricGroup extends
ProxyMetricGroup<TaskMetricGroup> {
public TaskIOMetricGroup(TaskMetricGroup parent, Clock clock) {
super(parent);
this.clock = clock;
- this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN);
- this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT);
+ this.numBytesIn = counter(MetricNames.IO_NUM_BYTES_IN, new
SumCounter());
+ this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT, new
SumCounter());
this.numBytesInRate = meter(MetricNames.IO_NUM_BYTES_IN_RATE, new
MeterView(numBytesIn));
this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new
MeterView(numBytesOut));
@@ -325,6 +325,15 @@ public class TaskIOMetricGroup extends
ProxyMetricGroup<TaskMetricGroup> {
//
============================================================================================
// Metric Reuse
//
============================================================================================
+
+ public void reuseBytesInputCounter(Counter numBytesInCounter) {
+ this.numBytesIn.addCounter(numBytesInCounter);
+ }
+
+ public void reuseBytesOutputCounter(Counter numBytesOutCounter) {
+ this.numBytesOut.addCounter(numBytesOutCounter);
+ }
+
public void reuseRecordsInputCounter(Counter numRecordsInCounter) {
this.numRecordsIn.addCounter(numRecordsInCounter);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
index 4abb4558d4b..1564bb44f02 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.slf4j.Logger;
@@ -167,6 +168,21 @@ public class InMemoryReporter implements MetricReporter {
}
}
+ public List<TaskMetricGroup> findTaskMetricGroups(JobID jobId, String
operatorPattern) {
+ Pattern pattern = Pattern.compile(operatorPattern);
+ synchronized (this) {
+ return metrics.keySet().stream()
+ .filter(
+ g ->
+ g instanceof TaskMetricGroup
+ &&
pattern.matcher(getTaskName(g)).find()
+ &&
getJobId(g).equals(jobId.toString()))
+ .map(TaskMetricGroup.class::cast)
+ .sorted(Comparator.comparing(this::getSubtaskId))
+ .collect(Collectors.toList());
+ }
+ }
+
public List<Tuple3<MetricGroup, String, Metric>> findJobMetricGroups(
JobID jobId, String metricPattern) {
Pattern pattern = Pattern.compile(metricPattern);
@@ -189,7 +205,7 @@ public class InMemoryReporter implements MetricReporter {
}
}
- private String getSubtaskId(OperatorMetricGroup g) {
+ private String getSubtaskId(MetricGroup g) {
return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_SUBTASK_INDEX);
}
@@ -197,6 +213,10 @@ public class InMemoryReporter implements MetricReporter {
return g.getAllVariables().get(ScopeFormat.SCOPE_OPERATOR_NAME);
}
+ private String getTaskName(MetricGroup g) {
+ return g.getAllVariables().get(ScopeFormat.SCOPE_TASK_NAME);
+ }
+
private String getJobId(MetricGroup g) {
return g.getAllVariables().get(ScopeFormat.SCOPE_JOB_ID);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b915ab5ba88..3c7c1964a37 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -238,6 +238,11 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
initSourceMetricGroup();
+ // Metric "numRecordsIn" & "numBytesIn" is defined as the total number
of records/bytes
+ // read from the external system in FLIP-33, reuse them for task to
account for traffic
+ // with external system
+ this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
+ this.metrics.getIOMetricGroup().reuseBytesInputMetricsForTask();
}
@VisibleForTesting
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index f39b9f9cdf1..ea9a60d6133 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -44,11 +44,13 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.UserCodeClassLoader;
import javax.annotation.Nullable;
@@ -129,6 +131,19 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
}
}
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<CommittableMessage<CommT>>> output) {
+ super.setup(containingTask, config, output);
+ // Metric "numRecordsOut" & "numBytesOut" is defined as the total
number of records/bytes
+ // written to the external system in FLIP-33, reuse them for task to
account for traffic
+ // with external system
+ this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
+ this.metrics.getIOMetricGroup().reuseBytesOutputMetricsForTask();
+ }
+
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index fff561013e9..54d5734e2e6 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -159,6 +160,26 @@ public class SinkMetricsITCase extends TestLogger {
.isEqualTo((processedRecordsPerSubtask - 1) *
MetricWriter.BASE_SEND_TIME);
}
assertThat(subtaskWithMetrics, equalTo(numSplits));
+
+ // Test operator I/O metrics are reused by task metrics
+ List<TaskMetricGroup> taskMetricGroups =
+ reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
+ assertThat(taskMetricGroups, hasSize(parallelism));
+
+ int subtaskWithTaskMetrics = 0;
+ for (TaskMetricGroup taskMetricGroup : taskMetricGroups) {
+ // there are only 2 splits assigned; so two groups will not update
metrics
+ if
(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+ continue;
+ }
+
+ subtaskWithTaskMetrics++;
+
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter())
+ .isEqualTo(processedRecordsPerSubtask);
+
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesOutCounter())
+ .isEqualTo(processedRecordsPerSubtask *
MetricWriter.RECORD_SIZE_IN_BYTES);
+ }
+ assertThat(subtaskWithTaskMetrics, equalTo(numSplits));
}
private static class MetricWriter extends TestSink.DefaultSinkWriter<Long>
{
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
index 4fd5a2e5770..bb227552a3e 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -213,6 +214,25 @@ public class SinkV2MetricsITCase extends TestLogger {
.isEqualTo((processedRecordsPerSubtask - 1) *
MetricWriter.BASE_SEND_TIME);
}
assertThat(subtaskWithMetrics, equalTo(numSplits));
+
+ // Test operator I/O metrics are reused by task metrics
+ List<TaskMetricGroup> taskMetricGroups =
+ reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
+
+ int subtaskWithTaskMetrics = 0;
+ for (TaskMetricGroup taskMetricGroup : taskMetricGroups) {
+ // there are only 2 splits assigned; so two groups will not update
metrics
+ if
(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+ continue;
+ }
+
+ subtaskWithTaskMetrics++;
+
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter())
+ .isEqualTo(processedRecordsPerSubtask);
+
assertThatCounter(taskMetricGroup.getIOMetricGroup().getNumBytesOutCounter())
+ .isEqualTo(processedRecordsPerSubtask *
MetricWriter.RECORD_SIZE_IN_BYTES);
+ }
+ assertThat(subtaskWithTaskMetrics, equalTo(numSplits));
}
private void assertSinkCommitterMetrics(JobID jobId, Map<String, Long>
expected) {