This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d07df5791f42b64891c5396f912841c9c7133d7a
Author: Jing Ge <[email protected]>
AuthorDate: Wed Mar 2 21:06:30 2022 +0100

    [FLINK-26126][kafka] develop record out error counter metric
---
 .../flink/connector/kafka/sink/KafkaWriter.java    |  7 ++++-
 .../connector/kafka/sink/KafkaWriterITCase.java    | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 8102ee8..ecd62a0 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -95,6 +95,7 @@ class KafkaWriter<IN>
     private final boolean disabledMetrics;
     private final Counter numRecordsSendCounter;
     private final Counter numBytesSendCounter;
+    private final Counter numRecordsOutErrorsCounter;
     private final ProcessingTimeService timeService;
 
     // Number of outgoing bytes at the latest metric sync
@@ -154,6 +155,7 @@ class KafkaWriter<IN>
         this.metricGroup = sinkInitContext.metricGroup();
         this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
         this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
+        this.numRecordsOutErrorsCounter = 
metricGroup.getNumRecordsOutErrorsCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -410,7 +412,10 @@ class KafkaWriter<IN>
                 FlinkKafkaInternalProducer<byte[], byte[]> producer =
                         KafkaWriter.this.currentProducer;
                 mailboxExecutor.execute(
-                        () -> throwException(metadata, exception, producer),
+                        () -> {
+                            numRecordsOutErrorsCounter.inc();
+                            throwException(metadata, exception, producer);
+                        },
                         "Failed to send data to Kafka");
             }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index b8447a1..ee21d04 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -193,6 +193,42 @@ public class KafkaWriterITCase {
     }
 
     @Test
+    void testNumRecordsOutErrorsCounterMetric() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
+        final InternalSinkWriterMetricGroup metricGroup =
+                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
+
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup)) {
+            final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
+            
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(0L);
+
+            writer.write(1, SINK_WRITER_CONTEXT);
+            
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(0L);
+
+            final String transactionalId = 
writer.getCurrentProducer().getTransactionalId();
+
+            try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                    new FlinkKafkaInternalProducer<>(properties, 
transactionalId)) {
+
+                producer.initTransactions();
+                producer.beginTransaction();
+                producer.send(new ProducerRecord<byte[], byte[]>(topic, 
"2".getBytes()));
+                producer.commitTransaction();
+            }
+
+            writer.write(3, SINK_WRITER_CONTEXT);
+            writer.flush(false);
+            writer.prepareCommit();
+            
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(1L);
+        }
+    }
+
+    @Test
     public void testMetadataPublisher() throws Exception {
         List<String> metadataList = new ArrayList<>();
         try (final KafkaWriter<Integer> writer =

Reply via email to