This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d07df5791f42b64891c5396f912841c9c7133d7a Author: Jing Ge <[email protected]> AuthorDate: Wed Mar 2 21:06:30 2022 +0100 [FLINK-26126][kafka] develop record out error counter metric --- .../flink/connector/kafka/sink/KafkaWriter.java | 7 ++++- .../connector/kafka/sink/KafkaWriterITCase.java | 36 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 8102ee8..ecd62a0 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -95,6 +95,7 @@ class KafkaWriter<IN> private final boolean disabledMetrics; private final Counter numRecordsSendCounter; private final Counter numBytesSendCounter; + private final Counter numRecordsOutErrorsCounter; private final ProcessingTimeService timeService; // Number of outgoing bytes at the latest metric sync @@ -154,6 +155,7 @@ class KafkaWriter<IN> this.metricGroup = sinkInitContext.metricGroup(); this.numBytesSendCounter = metricGroup.getNumBytesSendCounter(); this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); + this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter(); this.kafkaSinkContext = new DefaultKafkaSinkContext( sinkInitContext.getSubtaskId(), @@ -410,7 +412,10 @@ class KafkaWriter<IN> FlinkKafkaInternalProducer<byte[], byte[]> producer = KafkaWriter.this.currentProducer; mailboxExecutor.execute( - () -> throwException(metadata, exception, producer), + () -> { + numRecordsOutErrorsCounter.inc(); + throwException(metadata, exception, producer); + }, "Failed to send data to Kafka"); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index b8447a1..ee21d04 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -193,6 +193,42 @@ public class KafkaWriterITCase { } @Test + void testNumRecordsOutErrorsCounterMetric() throws Exception { + Properties properties = getKafkaClientConfiguration(); + final InternalSinkWriterMetricGroup metricGroup = + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); + + try (final KafkaWriter<Integer> writer = + createWriterWithConfiguration( + properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) { + final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); + org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount()) + .isEqualTo(0L); + + writer.write(1, SINK_WRITER_CONTEXT); + org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount()) + .isEqualTo(0L); + + final String transactionalId = writer.getCurrentProducer().getTransactionalId(); + + try (FlinkKafkaInternalProducer<byte[], byte[]> producer = + new FlinkKafkaInternalProducer<>(properties, transactionalId)) { + + producer.initTransactions(); + producer.beginTransaction(); + producer.send(new ProducerRecord<byte[], byte[]>(topic, "2".getBytes())); + producer.commitTransaction(); + } + + writer.write(3, SINK_WRITER_CONTEXT); + writer.flush(false); + writer.prepareCommit(); + org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount()) + .isEqualTo(1L); + } + } + + @Test public void testMetadataPublisher() throws Exception { List<String> metadataList = new ArrayList<>(); try (final KafkaWriter<Integer> writer =
