This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c336b848f98bd92df4960126c4335d45289cff21
Author: Jing Ge <[email protected]>
AuthorDate: Tue Mar 15 16:00:02 2022 +0100

    [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup 
directly.
    
    (cherry picked from commit 3ca38240496d1d0f1289f5aecf1226f537e30233)
---
 .../flink/connector/file/sink/writer/FileWriter.java |  7 +++----
 .../connector/file/sink/writer/FileWriterTest.java   | 20 +++++++++++---------
 2 files changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index fad3b50..51cc6d8 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -92,7 +92,7 @@ public class FileWriter<IN>
 
     private final OutputFileConfig outputFileConfig;
 
-    private final Counter recordsOutCounter;
+    private final Counter numRecordsSendCounter;
 
     private boolean endOfInput;
 
@@ -128,8 +128,7 @@ public class FileWriter<IN>
         this.activeBuckets = new HashMap<>();
         this.bucketerContext = new BucketerContext();
 
-        this.recordsOutCounter =
-                
checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
+        this.numRecordsSendCounter = 
checkNotNull(metricGroup).getNumRecordsSendCounter();
         this.processingTimeService = checkNotNull(processingTimeService);
         checkArgument(
                 bucketCheckInterval > 0,
@@ -196,7 +195,7 @@ public class FileWriter<IN>
         final String bucketId = bucketAssigner.getBucketId(element, 
bucketerContext);
         final FileWriterBucket<IN> bucket = 
getOrCreateBucketForBucketId(bucketId);
         bucket.write(element, 
processingTimeService.getCurrentProcessingTime());
-        recordsOutCounter.inc();
+        numRecordsSendCounter.inc();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index e521f61..8966ca7 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -283,16 +283,19 @@ public class FileWriterTest {
     public void testNumberRecordsOutCounter() throws IOException, 
InterruptedException {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+        final SinkWriterMetricGroup sinkWriterMetricGroup =
+                InternalSinkWriterMetricGroup.mock(
+                        metricListener.getMetricGroup(), 
operatorIOMetricGroup);
         File outDir = TEMP_FOLDER.newFolder();
         Path path = new Path(outDir.toURI());
-        Counter recordsCounter = 
operatorIOMetricGroup.getNumRecordsOutCounter();
+        Counter recordsCounter = 
sinkWriterMetricGroup.getNumRecordsSendCounter();
         SinkWriter.Context context = new ContextImpl();
         FileWriter<String> fileWriter =
                 createWriter(
                         path,
                         DefaultRollingPolicy.builder().build(),
                         new OutputFileConfig("part-", ""),
-                        operatorIOMetricGroup);
+                        sinkWriterMetricGroup);
 
         assertEquals(0, recordsCounter.getCount());
         fileWriter.write("1", context);
@@ -432,13 +435,8 @@ public class FileWriterTest {
             Path basePath,
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig,
-            OperatorIOMetricGroup operatorIOMetricGroup)
+            SinkWriterMetricGroup sinkWriterMetricGroup)
             throws IOException {
-        final SinkWriterMetricGroup sinkWriterMetricGroup =
-                operatorIOMetricGroup == null
-                        ? 
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())
-                        : InternalSinkWriterMetricGroup.mock(
-                                metricListener.getMetricGroup(), 
operatorIOMetricGroup);
         return new FileWriter<>(
                 basePath,
                 sinkWriterMetricGroup,
@@ -458,7 +456,11 @@ public class FileWriterTest {
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig)
             throws IOException {
-        return createWriter(basePath, rollingPolicy, outputFileConfig, null);
+        return createWriter(
+                basePath,
+                rollingPolicy,
+                outputFileConfig,
+                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
     }
 
     private FileWriter<String> createWriter(

Reply via email to