This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d5f0d49  [FLINK-24382][metrics] Do not compute recordsOut metric in 
SinkOperator to allow sinks emitting a flexible number of records
d5f0d49 is described below

commit d5f0d492334e54e005fe39b9d2a623ebe6c2f99b
Author: Fabian Paul <[email protected]>
AuthorDate: Mon Sep 27 16:38:07 2021 +0200

    [FLINK-24382][metrics] Do not compute recordsOut metric in SinkOperator to 
allow sinks emitting a flexible number of records
    
    Initially, we assumed every incoming record of a sink creates one
    outgoing record after SinkWriter#write is called. This is not correct
    for all sinks thus the SinkWriter should now increment the counter
    accordingly.
---
 .../apache/flink/connector/file/sink/FileSink.java |  2 +
 .../connector/file/sink/writer/FileWriter.java     |  9 ++++
 .../connector/file/sink/writer/FileWriterTest.java | 61 ++++++++++++++++++++--
 .../flink/connector/kafka/sink/KafkaWriter.java    |  3 ++
 .../connector/kafka/sink/KafkaWriterITCase.java    |  9 ++--
 .../runtime/operators/sink/SinkOperator.java       |  4 --
 .../test/streaming/runtime/SinkMetricsITCase.java  |  4 ++
 7 files changed, 81 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 4905231..920a11b 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -287,6 +287,7 @@ public class FileSink<IN> implements Sink<IN, 
FileSinkCommittable, FileWriterBuc
         FileWriter<IN> createWriter(InitContext context) throws IOException {
             return new FileWriter<>(
                     basePath,
+                    context.metricGroup(),
                     bucketAssigner,
                     bucketFactory,
                     createBucketWriter(),
@@ -435,6 +436,7 @@ public class FileSink<IN> implements Sink<IN, 
FileSinkCommittable, FileWriterBuc
         FileWriter<IN> createWriter(InitContext context) throws IOException {
             return new FileWriter<>(
                     basePath,
+                    context.metricGroup(),
                     bucketAssigner,
                     bucketFactory,
                     createBucketWriter(),
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index ab133ab..15aebd0 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
@@ -85,10 +87,13 @@ public class FileWriter<IN>
 
     private final OutputFileConfig outputFileConfig;
 
+    private final Counter recordsOutCounter;
+
     /**
      * A constructor creating a new empty bucket manager.
      *
      * @param basePath The base path for our buckets.
+     * @param metricGroup {@link SinkWriterMetricGroup} to set sink writer 
specific metrics.
      * @param bucketAssigner The {@link BucketAssigner} provided by the user.
      * @param bucketFactory The {@link FileWriterBucketFactory} to be used to 
create buckets.
      * @param bucketWriter The {@link BucketWriter} to be used when writing 
data.
@@ -96,6 +101,7 @@ public class FileWriter<IN>
      */
     public FileWriter(
             final Path basePath,
+            final SinkWriterMetricGroup metricGroup,
             final BucketAssigner<IN, String> bucketAssigner,
             final FileWriterBucketFactory<IN> bucketFactory,
             final BucketWriter<IN, String> bucketWriter,
@@ -115,6 +121,8 @@ public class FileWriter<IN>
         this.activeBuckets = new HashMap<>();
         this.bucketerContext = new BucketerContext();
 
+        this.recordsOutCounter =
+                
checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
         this.processingTimeService = checkNotNull(processingTimeService);
         checkArgument(
                 bucketCheckInterval > 0,
@@ -181,6 +189,7 @@ public class FileWriter<IN>
         final String bucketId = bucketAssigner.getBucketId(element, 
bucketerContext);
         final FileWriterBucket<IN> bucket = 
getOrCreateBucketForBucketId(bucketId);
         bucket.write(element, 
processingTimeService.getCurrentProcessingTime());
+        recordsOutCounter.inc();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index 9f5a051..ab55433 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -27,6 +27,12 @@ import 
org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
@@ -37,6 +43,7 @@ import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -62,6 +69,13 @@ public class FileWriterTest {
 
     @ClassRule public static final TemporaryFolder TEMP_FOLDER = new 
TemporaryFolder();
 
+    private MetricListener metricListener;
+
+    @Before
+    public void setUp() {
+        metricListener = new MetricListener();
+    }
+
     @Test
     public void testPreCommit() throws Exception {
         File outDir = TEMP_FOLDER.newFolder();
@@ -260,6 +274,29 @@ public class FileWriterTest {
         testCorrectTimestampPassingInContext(null, 4L, 5L);
     }
 
+    @Test
+    public void testNumberRecordsOutCounter() throws IOException {
+        final OperatorIOMetricGroup operatorIOMetricGroup =
+                
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+        File outDir = TEMP_FOLDER.newFolder();
+        Path path = new Path(outDir.toURI());
+        Counter recordsCounter = 
operatorIOMetricGroup.getNumRecordsOutCounter();
+        SinkWriter.Context context = new ContextImpl();
+        FileWriter<String> fileWriter =
+                createWriter(
+                        path,
+                        DefaultRollingPolicy.builder().build(),
+                        new OutputFileConfig("part-", ""),
+                        operatorIOMetricGroup);
+
+        assertEquals(0, recordsCounter.getCount());
+        fileWriter.write("1", context);
+        assertEquals(1, recordsCounter.getCount());
+        fileWriter.write("2", context);
+        fileWriter.write("3", context);
+        assertEquals(3, recordsCounter.getCount());
+    }
+
     private void testCorrectTimestampPassingInContext(
             Long timestamp, long watermark, long processingTime) throws 
Exception {
         final File outDir = TEMP_FOLDER.newFolder();
@@ -384,13 +421,20 @@ public class FileWriterTest {
 
     // ------------------------------- Utility Methods 
--------------------------------
 
-    private static FileWriter<String> createWriter(
+    private FileWriter<String> createWriter(
             Path basePath,
             RollingPolicy<String, String> rollingPolicy,
-            OutputFileConfig outputFileConfig)
+            OutputFileConfig outputFileConfig,
+            OperatorIOMetricGroup operatorIOMetricGroup)
             throws IOException {
+        final SinkWriterMetricGroup sinkWriterMetricGroup =
+                operatorIOMetricGroup == null
+                        ? 
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())
+                        : InternalSinkWriterMetricGroup.mock(
+                                metricListener.getMetricGroup(), 
operatorIOMetricGroup);
         return new FileWriter<>(
                 basePath,
+                sinkWriterMetricGroup,
                 new FileSinkTestUtils.StringIdentityBucketAssigner(),
                 new DefaultFileWriterBucketFactory<>(),
                 new RowWiseBucketWriter<>(
@@ -402,7 +446,15 @@ public class FileWriterTest {
                 10);
     }
 
-    private static FileWriter<String> createWriter(
+    private FileWriter<String> createWriter(
+            Path basePath,
+            RollingPolicy<String, String> rollingPolicy,
+            OutputFileConfig outputFileConfig)
+            throws IOException {
+        return createWriter(basePath, rollingPolicy, outputFileConfig, null);
+    }
+
+    private FileWriter<String> createWriter(
             Path basePath,
             BucketAssigner<String, String> bucketAssigner,
             RollingPolicy<String, String> rollingPolicy,
@@ -412,6 +464,7 @@ public class FileWriterTest {
             throws IOException {
         return new FileWriter<>(
                 basePath,
+                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
                 bucketAssigner,
                 new DefaultFileWriterBucketFactory<>(),
                 new RowWiseBucketWriter<>(
@@ -423,7 +476,7 @@ public class FileWriterTest {
                 bucketCheckInterval);
     }
 
-    private static FileWriter<String> restoreWriter(
+    private FileWriter<String> restoreWriter(
             List<FileWriterBucketState> states,
             Path basePath,
             RollingPolicy<String, String> rollingPolicy,
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 6209b1b..7cb22e2 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -86,6 +86,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, 
KafkaCommittable, KafkaWriterSta
     private final Counter numBytesOutCounter;
     private final Sink.ProcessingTimeService timeService;
     private final boolean disabledMetrics;
+    private final Counter numRecordsOutCounter;
 
     // Number of outgoing bytes at the latest metric sync
     private long latestOutgoingByteTotal;
@@ -140,6 +141,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, 
KafkaCommittable, KafkaWriterSta
         this.timeService = sinkInitContext.getProcessingTimeService();
         this.metricGroup = sinkInitContext.metricGroup();
         this.numBytesOutCounter = 
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+        this.numRecordsOutCounter = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -179,6 +181,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, 
KafkaCommittable, KafkaWriterSta
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, 
context.timestamp());
         currentProducer.send(record, deliveryCallback);
+        numRecordsOutCounter.inc();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 145534c..a1be041 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -73,6 +73,7 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for the standalone KafkaWriter. */
@@ -136,7 +137,7 @@ public class KafkaWriterITCase extends TestLogger {
     }
 
     @Test
-    public void testIncreasingByteOutCounter() throws Exception {
+    public void testIncreasingRecordBasedCounters() throws Exception {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
         final InternalSinkWriterMetricGroup metricGroup =
@@ -146,9 +147,11 @@ public class KafkaWriterITCase extends TestLogger {
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), DeliveryGuarantee.NONE, 
metricGroup)) {
             final Counter numBytesOut = 
operatorIOMetricGroup.getNumBytesOutCounter();
-            Assertions.assertEquals(numBytesOut.getCount(), 0L);
+            final Counter numRecordsOut = 
operatorIOMetricGroup.getNumRecordsOutCounter();
+            assertEquals(numBytesOut.getCount(), 0L);
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
+            assertEquals(numRecordsOut.getCount(), 1);
             assertThat(numBytesOut.getCount(), greaterThan(0L));
         }
     }
@@ -165,7 +168,7 @@ public class KafkaWriterITCase extends TestLogger {
             final Optional<Gauge<Long>> currentSendTime =
                     metricListener.getGauge("currentSendTime");
             assertTrue(currentSendTime.isPresent());
-            Assertions.assertEquals(currentSendTime.get().getValue(), 0L);
+            assertEquals(currentSendTime.get().getValue(), 0L);
             IntStream.range(0, 100)
                     .forEach(
                             (run) -> {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
index abcc670..5684340 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -104,7 +103,6 @@ class SinkOperator<InputT, CommT, WriterStateT> extends 
AbstractStreamOperator<b
             writerFactory;
 
     private final MailboxExecutor mailboxExecutor;
-    private Counter numRecordsOutCounter;
     // record endOfInput state to avoid duplicate prepareCommit on final 
notifyCheckpointComplete
     // once FLIP-147 is fully operational all endOfInput processing needs to 
be removed
     private boolean endOfInput = false;
@@ -137,7 +135,6 @@ class SinkOperator<InputT, CommT, WriterStateT> extends 
AbstractStreamOperator<b
             StreamConfig config,
             Output<StreamRecord<byte[]>> output) {
         super.setup(containingTask, config, output);
-        numRecordsOutCounter = 
getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
     }
 
     @Override
@@ -164,7 +161,6 @@ class SinkOperator<InputT, CommT, WriterStateT> extends 
AbstractStreamOperator<b
     public void processElement(StreamRecord<InputT> element) throws Exception {
         context.element = element;
         sinkWriter.write(element.getValue(), context);
-        numRecordsOutCounter.inc();
     }
 
     @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 7ad504f..f91a013 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -166,10 +167,12 @@ public class SinkMetricsITCase extends TestLogger {
         static final long RECORD_SIZE_IN_BYTES = 10;
         private SinkWriterMetricGroup metricGroup;
         private long sendTime;
+        private Counter recordsOutCounter;
 
         @Override
         public void init(Sink.InitContext context) {
             this.metricGroup = context.metricGroup();
+            this.recordsOutCounter = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
             metricGroup.setCurrentSendTimeGauge(() -> sendTime);
         }
 
@@ -177,6 +180,7 @@ public class SinkMetricsITCase extends TestLogger {
         public void write(Long element, Context context) {
             super.write(element, context);
             sendTime = element * BASE_SEND_TIME;
+            recordsOutCounter.inc();
             if (element % 2 == 0) {
                 metricGroup.getNumRecordsOutErrorsCounter().inc();
             }

Reply via email to