This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new be13d05b7f6 [FLINK-31752] Fix SourceOperator numRecordsOut duplicate 
bug
be13d05b7f6 is described below

commit be13d05b7f66dc5f0c926565ff139aa8313fcdc5
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Apr 24 00:00:54 2023 +0800

    [FLINK-31752] Fix SourceOperator numRecordsOut duplicate bug
    
    This closes #22462.
---
 .../flink/connector/base/source/reader/SourceMetricsITCase.java       | 4 ++++
 .../flink/connector/base/source/reader/mocks/MockRecordEmitter.java   | 1 +
 .../flink/streaming/runtime/tasks/SourceOperatorStreamTask.java       | 4 ----
 3 files changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index 191c383ad13..8b00c8d9776 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -182,6 +182,10 @@ public class SourceMetricsITCase extends TestLogger {
                     .isEqualTo(processedRecordsPerSubtask);
             assertThatCounter(group.getIOMetricGroup().getNumBytesInCounter())
                     .isEqualTo(processedRecordsPerSubtask * 
MockRecordEmitter.RECORD_SIZE_IN_BYTES);
+            
assertThatCounter(group.getIOMetricGroup().getNumRecordsOutCounter())
+                    .isEqualTo(processedRecordsPerSubtask);
+            assertThatCounter(group.getIOMetricGroup().getNumBytesOutCounter())
+                    .isEqualTo(processedRecordsPerSubtask * 
MockRecordEmitter.RECORD_SIZE_IN_BYTES);
             // MockRecordEmitter is just incrementing errors every even record
             assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS))
                     .isEqualTo(processedRecordsPerSubtask / 2);
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
index af700c47813..c2ce38ee2ad 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
@@ -53,6 +53,7 @@ public class MockRecordEmitter implements 
RecordEmitter<int[], Integer, MockSpli
             this.metricGroup.getNumRecordsInErrorsCounter().inc();
         }
         
this.metricGroup.getIOMetricGroup().getNumBytesInCounter().inc(RECORD_SIZE_IN_BYTES);
+        
this.metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
 
         // The value is the first element.
         output.collect(record[0]);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 3d2805599fb..70fa37fc1d0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
 import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SavepointType;
@@ -293,7 +292,6 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
         private final Output<StreamRecord<T>> output;
         private final InternalSourceReaderMetricGroup metricGroup;
         @Nullable private final WatermarkGauge inputWatermarkGauge;
-        private final Counter numRecordsOut;
 
         public AsyncDataOutputToOutput(
                 Output<StreamRecord<T>> output,
@@ -301,14 +299,12 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                 @Nullable WatermarkGauge inputWatermarkGauge) {
 
             this.output = checkNotNull(output);
-            this.numRecordsOut = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
             this.inputWatermarkGauge = inputWatermarkGauge;
             this.metricGroup = metricGroup;
         }
 
         @Override
         public void emitRecord(StreamRecord<T> streamRecord) {
-            numRecordsOut.inc();
             metricGroup.recordEmitted(streamRecord.getTimestamp());
             output.collect(streamRecord);
         }

Reply via email to