This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new d5f0d49 [FLINK-24382][metrics] Do not compute recordsOut metric in
SinkOperator to allow sinks emitting a flexible number of records
d5f0d49 is described below
commit d5f0d492334e54e005fe39b9d2a623ebe6c2f99b
Author: Fabian Paul <[email protected]>
AuthorDate: Mon Sep 27 16:38:07 2021 +0200
[FLINK-24382][metrics] Do not compute recordsOut metric in SinkOperator to
allow sinks emitting a flexible number of records
Initially, we assumed every incoming record of a sink creates one
outgoing record after SinkWriter#write is called. This is not correct
for all sinks thus the SinkWriter should now increment the counter
accordingly.
---
.../apache/flink/connector/file/sink/FileSink.java | 2 +
.../connector/file/sink/writer/FileWriter.java | 9 ++++
.../connector/file/sink/writer/FileWriterTest.java | 61 ++++++++++++++++++++--
.../flink/connector/kafka/sink/KafkaWriter.java | 3 ++
.../connector/kafka/sink/KafkaWriterITCase.java | 9 ++--
.../runtime/operators/sink/SinkOperator.java | 4 --
.../test/streaming/runtime/SinkMetricsITCase.java | 4 ++
7 files changed, 81 insertions(+), 11 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 4905231..920a11b 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -287,6 +287,7 @@ public class FileSink<IN> implements Sink<IN,
FileSinkCommittable, FileWriterBuc
FileWriter<IN> createWriter(InitContext context) throws IOException {
return new FileWriter<>(
basePath,
+ context.metricGroup(),
bucketAssigner,
bucketFactory,
createBucketWriter(),
@@ -435,6 +436,7 @@ public class FileSink<IN> implements Sink<IN,
FileSinkCommittable, FileWriterBuc
FileWriter<IN> createWriter(InitContext context) throws IOException {
return new FileWriter<>(
basePath,
+ context.metricGroup(),
bucketAssigner,
bucketFactory,
createBucketWriter(),
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index ab133ab..15aebd0 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
@@ -85,10 +87,13 @@ public class FileWriter<IN>
private final OutputFileConfig outputFileConfig;
+ private final Counter recordsOutCounter;
+
/**
* A constructor creating a new empty bucket manager.
*
* @param basePath The base path for our buckets.
+ * @param metricGroup {@link SinkWriterMetricGroup} to set sink writer
specific metrics.
* @param bucketAssigner The {@link BucketAssigner} provided by the user.
* @param bucketFactory The {@link FileWriterBucketFactory} to be used to
create buckets.
* @param bucketWriter The {@link BucketWriter} to be used when writing
data.
@@ -96,6 +101,7 @@ public class FileWriter<IN>
*/
public FileWriter(
final Path basePath,
+ final SinkWriterMetricGroup metricGroup,
final BucketAssigner<IN, String> bucketAssigner,
final FileWriterBucketFactory<IN> bucketFactory,
final BucketWriter<IN, String> bucketWriter,
@@ -115,6 +121,8 @@ public class FileWriter<IN>
this.activeBuckets = new HashMap<>();
this.bucketerContext = new BucketerContext();
+ this.recordsOutCounter =
+
checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
this.processingTimeService = checkNotNull(processingTimeService);
checkArgument(
bucketCheckInterval > 0,
@@ -181,6 +189,7 @@ public class FileWriter<IN>
final String bucketId = bucketAssigner.getBucketId(element,
bucketerContext);
final FileWriterBucket<IN> bucket =
getOrCreateBucketForBucketId(bucketId);
bucket.write(element,
processingTimeService.getCurrentProcessingTime());
+ recordsOutCounter.inc();
}
@Override
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index 9f5a051..ab55433 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -27,6 +27,12 @@ import
org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
@@ -37,6 +43,7 @@ import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -62,6 +69,13 @@ public class FileWriterTest {
@ClassRule public static final TemporaryFolder TEMP_FOLDER = new
TemporaryFolder();
+ private MetricListener metricListener;
+
+ @Before
+ public void setUp() {
+ metricListener = new MetricListener();
+ }
+
@Test
public void testPreCommit() throws Exception {
File outDir = TEMP_FOLDER.newFolder();
@@ -260,6 +274,29 @@ public class FileWriterTest {
testCorrectTimestampPassingInContext(null, 4L, 5L);
}
+ @Test
+ public void testNumberRecordsOutCounter() throws IOException {
+ final OperatorIOMetricGroup operatorIOMetricGroup =
+
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+ File outDir = TEMP_FOLDER.newFolder();
+ Path path = new Path(outDir.toURI());
+ Counter recordsCounter =
operatorIOMetricGroup.getNumRecordsOutCounter();
+ SinkWriter.Context context = new ContextImpl();
+ FileWriter<String> fileWriter =
+ createWriter(
+ path,
+ DefaultRollingPolicy.builder().build(),
+ new OutputFileConfig("part-", ""),
+ operatorIOMetricGroup);
+
+ assertEquals(0, recordsCounter.getCount());
+ fileWriter.write("1", context);
+ assertEquals(1, recordsCounter.getCount());
+ fileWriter.write("2", context);
+ fileWriter.write("3", context);
+ assertEquals(3, recordsCounter.getCount());
+ }
+
private void testCorrectTimestampPassingInContext(
Long timestamp, long watermark, long processingTime) throws
Exception {
final File outDir = TEMP_FOLDER.newFolder();
@@ -384,13 +421,20 @@ public class FileWriterTest {
// ------------------------------- Utility Methods
--------------------------------
- private static FileWriter<String> createWriter(
+ private FileWriter<String> createWriter(
Path basePath,
RollingPolicy<String, String> rollingPolicy,
- OutputFileConfig outputFileConfig)
+ OutputFileConfig outputFileConfig,
+ OperatorIOMetricGroup operatorIOMetricGroup)
throws IOException {
+ final SinkWriterMetricGroup sinkWriterMetricGroup =
+ operatorIOMetricGroup == null
+ ?
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())
+ : InternalSinkWriterMetricGroup.mock(
+ metricListener.getMetricGroup(),
operatorIOMetricGroup);
return new FileWriter<>(
basePath,
+ sinkWriterMetricGroup,
new FileSinkTestUtils.StringIdentityBucketAssigner(),
new DefaultFileWriterBucketFactory<>(),
new RowWiseBucketWriter<>(
@@ -402,7 +446,15 @@ public class FileWriterTest {
10);
}
- private static FileWriter<String> createWriter(
+ private FileWriter<String> createWriter(
+ Path basePath,
+ RollingPolicy<String, String> rollingPolicy,
+ OutputFileConfig outputFileConfig)
+ throws IOException {
+ return createWriter(basePath, rollingPolicy, outputFileConfig, null);
+ }
+
+ private FileWriter<String> createWriter(
Path basePath,
BucketAssigner<String, String> bucketAssigner,
RollingPolicy<String, String> rollingPolicy,
@@ -412,6 +464,7 @@ public class FileWriterTest {
throws IOException {
return new FileWriter<>(
basePath,
+
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
bucketAssigner,
new DefaultFileWriterBucketFactory<>(),
new RowWiseBucketWriter<>(
@@ -423,7 +476,7 @@ public class FileWriterTest {
bucketCheckInterval);
}
- private static FileWriter<String> restoreWriter(
+ private FileWriter<String> restoreWriter(
List<FileWriterBucketState> states,
Path basePath,
RollingPolicy<String, String> rollingPolicy,
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 6209b1b..7cb22e2 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -86,6 +86,7 @@ class KafkaWriter<IN> implements SinkWriter<IN,
KafkaCommittable, KafkaWriterSta
private final Counter numBytesOutCounter;
private final Sink.ProcessingTimeService timeService;
private final boolean disabledMetrics;
+ private final Counter numRecordsOutCounter;
// Number of outgoing bytes at the latest metric sync
private long latestOutgoingByteTotal;
@@ -140,6 +141,7 @@ class KafkaWriter<IN> implements SinkWriter<IN,
KafkaCommittable, KafkaWriterSta
this.timeService = sinkInitContext.getProcessingTimeService();
this.metricGroup = sinkInitContext.metricGroup();
this.numBytesOutCounter =
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+ this.numRecordsOutCounter =
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.kafkaSinkContext =
new DefaultKafkaSinkContext(
sinkInitContext.getSubtaskId(),
@@ -179,6 +181,7 @@ class KafkaWriter<IN> implements SinkWriter<IN,
KafkaCommittable, KafkaWriterSta
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
currentProducer.send(record, deliveryCallback);
+ numRecordsOutCounter.inc();
}
@Override
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 145534c..a1be041 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -73,6 +73,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for the standalone KafkaWriter. */
@@ -136,7 +137,7 @@ public class KafkaWriterITCase extends TestLogger {
}
@Test
- public void testIncreasingByteOutCounter() throws Exception {
+ public void testIncreasingRecordBasedCounters() throws Exception {
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
final InternalSinkWriterMetricGroup metricGroup =
@@ -146,9 +147,11 @@ public class KafkaWriterITCase extends TestLogger {
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.NONE,
metricGroup)) {
final Counter numBytesOut =
operatorIOMetricGroup.getNumBytesOutCounter();
- Assertions.assertEquals(numBytesOut.getCount(), 0L);
+ final Counter numRecordsOut =
operatorIOMetricGroup.getNumRecordsOutCounter();
+ assertEquals(numBytesOut.getCount(), 0L);
writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
+ assertEquals(numRecordsOut.getCount(), 1);
assertThat(numBytesOut.getCount(), greaterThan(0L));
}
}
@@ -165,7 +168,7 @@ public class KafkaWriterITCase extends TestLogger {
final Optional<Gauge<Long>> currentSendTime =
metricListener.getGauge("currentSendTime");
assertTrue(currentSendTime.isPresent());
- Assertions.assertEquals(currentSendTime.get().getValue(), 0L);
+ assertEquals(currentSendTime.get().getValue(), 0L);
IntStream.range(0, 100)
.forEach(
(run) -> {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
index abcc670..5684340 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -104,7 +103,6 @@ class SinkOperator<InputT, CommT, WriterStateT> extends
AbstractStreamOperator<b
writerFactory;
private final MailboxExecutor mailboxExecutor;
- private Counter numRecordsOutCounter;
// record endOfInput state to avoid duplicate prepareCommit on final
notifyCheckpointComplete
// once FLIP-147 is fully operational all endOfInput processing needs to
be removed
private boolean endOfInput = false;
@@ -137,7 +135,6 @@ class SinkOperator<InputT, CommT, WriterStateT> extends
AbstractStreamOperator<b
StreamConfig config,
Output<StreamRecord<byte[]>> output) {
super.setup(containingTask, config, output);
- numRecordsOutCounter =
getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
}
@Override
@@ -164,7 +161,6 @@ class SinkOperator<InputT, CommT, WriterStateT> extends
AbstractStreamOperator<b
public void processElement(StreamRecord<InputT> element) throws Exception {
context.element = element;
sinkWriter.write(element.getValue(), context);
- numRecordsOutCounter.inc();
}
@Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 7ad504f..f91a013 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -166,10 +167,12 @@ public class SinkMetricsITCase extends TestLogger {
static final long RECORD_SIZE_IN_BYTES = 10;
private SinkWriterMetricGroup metricGroup;
private long sendTime;
+ private Counter recordsOutCounter;
@Override
public void init(Sink.InitContext context) {
this.metricGroup = context.metricGroup();
+ this.recordsOutCounter =
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
metricGroup.setCurrentSendTimeGauge(() -> sendTime);
}
@@ -177,6 +180,7 @@ public class SinkMetricsITCase extends TestLogger {
public void write(Long element, Context context) {
super.write(element, context);
sendTime = element * BASE_SEND_TIME;
+ recordsOutCounter.inc();
if (element % 2 == 0) {
metricGroup.getNumRecordsOutErrorsCounter().inc();
}