This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c336b848f98bd92df4960126c4335d45289cff21 Author: Jing Ge <[email protected]> AuthorDate: Tue Mar 15 16:00:02 2022 +0100 [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly. (cherry picked from commit 3ca38240496d1d0f1289f5aecf1226f537e30233) --- .../flink/connector/file/sink/writer/FileWriter.java | 7 +++---- .../connector/file/sink/writer/FileWriterTest.java | 20 +++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java index fad3b50..51cc6d8 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java @@ -92,7 +92,7 @@ public class FileWriter<IN> private final OutputFileConfig outputFileConfig; - private final Counter recordsOutCounter; + private final Counter numRecordsSendCounter; private boolean endOfInput; @@ -128,8 +128,7 @@ public class FileWriter<IN> this.activeBuckets = new HashMap<>(); this.bucketerContext = new BucketerContext(); - this.recordsOutCounter = - checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter(); + this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter(); this.processingTimeService = checkNotNull(processingTimeService); checkArgument( bucketCheckInterval > 0, @@ -196,7 +195,7 @@ public class FileWriter<IN> final String bucketId = bucketAssigner.getBucketId(element, bucketerContext); final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId); bucket.write(element, processingTimeService.getCurrentProcessingTime()); - recordsOutCounter.inc(); + numRecordsSendCounter.inc(); } @Override diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java index e521f61..8966ca7 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java @@ -283,16 +283,19 @@ public class FileWriterTest { public void testNumberRecordsOutCounter() throws IOException, InterruptedException { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); + final SinkWriterMetricGroup sinkWriterMetricGroup = + InternalSinkWriterMetricGroup.mock( + metricListener.getMetricGroup(), operatorIOMetricGroup); File outDir = TEMP_FOLDER.newFolder(); Path path = new Path(outDir.toURI()); - Counter recordsCounter = operatorIOMetricGroup.getNumRecordsOutCounter(); + Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter(); SinkWriter.Context context = new ContextImpl(); FileWriter<String> fileWriter = createWriter( path, DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""), - operatorIOMetricGroup); + sinkWriterMetricGroup); assertEquals(0, recordsCounter.getCount()); fileWriter.write("1", context); @@ -432,13 +435,8 @@ public class FileWriterTest { Path basePath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig, - OperatorIOMetricGroup operatorIOMetricGroup) + SinkWriterMetricGroup sinkWriterMetricGroup) throws IOException { - final SinkWriterMetricGroup sinkWriterMetricGroup = - operatorIOMetricGroup == null - ? InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()) - : InternalSinkWriterMetricGroup.mock( - metricListener.getMetricGroup(), operatorIOMetricGroup); return new FileWriter<>( basePath, sinkWriterMetricGroup, @@ -458,7 +456,11 @@ public class FileWriterTest { RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException { - return createWriter(basePath, rollingPolicy, outputFileConfig, null); + return createWriter( + basePath, + rollingPolicy, + outputFileConfig, + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())); } private FileWriter<String> createWriter(
