This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new be13d05b7f6 [FLINK-31752] Fix SourceOperator numRecordsOut duplicate
bug
be13d05b7f6 is described below
commit be13d05b7f66dc5f0c926565ff139aa8313fcdc5
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Apr 24 00:00:54 2023 +0800
[FLINK-31752] Fix SourceOperator numRecordsOut duplicate bug
This closes #22462.
---
.../flink/connector/base/source/reader/SourceMetricsITCase.java | 4 ++++
.../flink/connector/base/source/reader/mocks/MockRecordEmitter.java | 1 +
.../flink/streaming/runtime/tasks/SourceOperatorStreamTask.java | 4 ----
3 files changed, 5 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index 191c383ad13..8b00c8d9776 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -182,6 +182,10 @@ public class SourceMetricsITCase extends TestLogger {
.isEqualTo(processedRecordsPerSubtask);
assertThatCounter(group.getIOMetricGroup().getNumBytesInCounter())
.isEqualTo(processedRecordsPerSubtask *
MockRecordEmitter.RECORD_SIZE_IN_BYTES);
+
assertThatCounter(group.getIOMetricGroup().getNumRecordsOutCounter())
+ .isEqualTo(processedRecordsPerSubtask);
+ assertThatCounter(group.getIOMetricGroup().getNumBytesOutCounter())
+ .isEqualTo(processedRecordsPerSubtask *
MockRecordEmitter.RECORD_SIZE_IN_BYTES);
// MockRecordEmitter is just incrementing errors every even record
assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS))
.isEqualTo(processedRecordsPerSubtask / 2);
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
index af700c47813..c2ce38ee2ad 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
@@ -53,6 +53,7 @@ public class MockRecordEmitter implements
RecordEmitter<int[], Integer, MockSpli
this.metricGroup.getNumRecordsInErrorsCounter().inc();
}
this.metricGroup.getIOMetricGroup().getNumBytesInCounter().inc(RECORD_SIZE_IN_BYTES);
+
this.metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
// The value is the first element.
output.collect(record[0]);
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 3d2805599fb..70fa37fc1d0 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
@@ -293,7 +292,6 @@ public class SourceOperatorStreamTask<T> extends
StreamTask<T, SourceOperator<T,
private final Output<StreamRecord<T>> output;
private final InternalSourceReaderMetricGroup metricGroup;
@Nullable private final WatermarkGauge inputWatermarkGauge;
- private final Counter numRecordsOut;
public AsyncDataOutputToOutput(
Output<StreamRecord<T>> output,
@@ -301,14 +299,12 @@ public class SourceOperatorStreamTask<T> extends
StreamTask<T, SourceOperator<T,
@Nullable WatermarkGauge inputWatermarkGauge) {
this.output = checkNotNull(output);
- this.numRecordsOut =
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.inputWatermarkGauge = inputWatermarkGauge;
this.metricGroup = metricGroup;
}
@Override
public void emitRecord(StreamRecord<T> streamRecord) {
- numRecordsOut.inc();
metricGroup.recordEmitted(streamRecord.getTimestamp());
output.collect(streamRecord);
}