This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 7e3c5a16458cced88360fb318e5b6e8b368a79d1
Author: mas-chen <[email protected]>
AuthorDate: Fri Mar 10 00:42:21 2023 -0800

    [FLINK-31305] Propagate producer exceptions outside of mailbox executor so 
that checkpoints can correctly fail
    
    This closes #19.
---
 .../flink/connector/kafka/sink/KafkaWriter.java    | 58 +++++++++++++++++++---
 .../connector/kafka/sink/KafkaWriterITCase.java    | 26 ++++++++++
 2 files changed, 77 insertions(+), 7 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index ba2cb4e4..adc05b8d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -89,7 +89,7 @@ class KafkaWriter<IN>
     private final KafkaRecordSerializationSchema<IN> recordSerializer;
     private final Callback deliveryCallback;
     private final KafkaRecordSerializationSchema.KafkaSinkContext 
kafkaSinkContext;
-
+    private volatile Exception asyncProducerException;
     private final Map<String, KafkaMetricMutableWrapper> 
previouslyCreatedMetrics = new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
     private final boolean disabledMetrics;
@@ -139,6 +139,7 @@ class KafkaWriter<IN>
         this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, 
"kafkaProducerConfig");
         this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, 
"transactionalIdPrefix");
         this.recordSerializer = checkNotNull(recordSerializer, 
"recordSerializer");
+        checkNotNull(sinkInitContext, "sinkInitContext");
         this.deliveryCallback =
                 new WriterCallback(
                         sinkInitContext.getMailboxExecutor(),
@@ -150,7 +151,6 @@ class KafkaWriter<IN>
                         || 
kafkaProducerConfig.containsKey(KEY_REGISTER_METRICS)
                                 && !Boolean.parseBoolean(
                                         
kafkaProducerConfig.get(KEY_REGISTER_METRICS).toString());
-        checkNotNull(sinkInitContext, "sinkInitContext");
         this.timeService = sinkInitContext.getProcessingTimeService();
         this.metricGroup = sinkInitContext.metricGroup();
         this.numBytesOutCounter = 
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
@@ -192,6 +192,8 @@ class KafkaWriter<IN>
 
     @Override
     public void write(@Nullable IN element, Context context) throws 
IOException {
+        checkAsyncException();
+
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, 
context.timestamp());
         if (record != null) {
@@ -206,6 +208,8 @@ class KafkaWriter<IN>
             LOG.debug("final flush={}", endOfInput);
             currentProducer.flush();
         }
+
+        checkAsyncException();
     }
 
     @Override
@@ -241,6 +245,9 @@ class KafkaWriter<IN>
                     checkState(currentProducer.isClosed());
                     currentProducer = null;
                 });
+
+        // Rethrow exception for the case in which close is called before 
writer() and flush().
+        checkAsyncException();
     }
 
     private void abortCurrentProducer() {
@@ -264,6 +271,16 @@ class KafkaWriter<IN>
         return currentProducer;
     }
 
+    @VisibleForTesting
+    Exception getAsyncProducerException() {
+        return asyncProducerException;
+    }
+
+    @VisibleForTesting
+    void setAsyncProducerException(Exception asyncProducerException) {
+        this.asyncProducerException = asyncProducerException;
+    }
+
     void abortLingeringTransactions(
             Collection<KafkaWriterState> recoveredStates, long 
startCheckpointId) {
         List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
@@ -397,6 +414,18 @@ class KafkaWriter<IN>
                 });
     }
 
+    /** This logic needs to be invoked by write AND flush since we support 
various semantics. */
+    private void checkAsyncException() throws IOException {
+        // reset this exception since we could close the writer later on
+        Exception e = asyncProducerException;
+        if (e != null) {
+
+            asyncProducerException = null;
+            throw new IOException(
+                    "One or more Kafka Producer send requests have encountered 
exception", e);
+        }
+    }
+
     private class WriterCallback implements Callback {
         private final MailboxExecutor mailboxExecutor;
         @Nullable private final Consumer<RecordMetadata> metadataConsumer;
@@ -413,12 +442,27 @@ class KafkaWriter<IN>
             if (exception != null) {
                 FlinkKafkaInternalProducer<byte[], byte[]> producer =
                         KafkaWriter.this.currentProducer;
-                mailboxExecutor.execute(
+
+                // Propagate the first exception since amount of exceptions 
could be large. Need to
+                // do this in Producer IO thread since flush() guarantees that 
the future will
+                // complete. The same guarantee does not hold for tasks 
executed in separate
+                // executor e.g. mailbox executor. flush() needs to have the 
exception immediately
+                // available to fail the checkpoint.
+                if (asyncProducerException != null) {
+                    asyncProducerException = decorateException(metadata, 
exception, producer);
+                }
+
+                mailboxExecutor.submit(
                         () -> {
+                            // Need to send metrics through mailbox thread 
since we are in the
+                            // producer io
+                            // thread
                             numRecordsOutErrorsCounter.inc();
-                            throwException(metadata, exception, producer);
+
+                            // Checking for exceptions from previous writes
+                            checkAsyncException();
                         },
-                        "Failed to send data to Kafka");
+                        "Update error metric");
             }
 
             if (metadataConsumer != null) {
@@ -426,7 +470,7 @@ class KafkaWriter<IN>
             }
         }
 
-        private void throwException(
+        private FlinkRuntimeException decorateException(
                 RecordMetadata metadata,
                 Exception exception,
                 FlinkKafkaInternalProducer<byte[], byte[]> producer) {
@@ -435,7 +479,7 @@ class KafkaWriter<IN>
             if (exception instanceof UnknownProducerIdException) {
                 message += KafkaCommitter.UNKNOWN_PRODUCER_ID_ERROR_MESSAGE;
             }
-            throw new FlinkRuntimeException(message, exception);
+            return new FlinkRuntimeException(message, exception);
         }
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 98f64ed1..e7627b26 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -75,6 +75,7 @@ import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaCo
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
 
 /** Tests for the standalone KafkaWriter. */
 @ExtendWith(TestLoggerExtension.class)
@@ -223,6 +224,9 @@ public class KafkaWriterITCase {
             }
 
             writer.write(3, SINK_WRITER_CONTEXT);
+            // this doesn't throw exception because the exception is thrown in 
the Producer IO
+            // thread in unit tests due to the mock mailbox executor, while it 
would be thrown in
+            // flush() when the real mailbox executor is configured
             writer.flush(false);
             writer.prepareCommit();
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
@@ -383,6 +387,28 @@ public class KafkaWriterITCase {
         }
     }
 
+    @Test
+    public void testErrorPropagation() {
+        Properties properties = getKafkaClientConfiguration();
+        final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(properties, 
DeliveryGuarantee.AT_LEAST_ONCE);
+        try {
+            writer.setAsyncProducerException(
+                    new IOException("previous send request encountered 
error."));
+            assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                    .hasRootCauseExactlyInstanceOf(IOException.class);
+
+            writer.setAsyncProducerException(
+                    new IOException("previous send request encountered 
error."));
+            assertThatCode(() -> writer.flush(false))
+                    .hasRootCauseExactlyInstanceOf(IOException.class);
+        } finally {
+            writer.setAsyncProducerException(
+                    new IOException("previous send request encountered 
error."));
+            
assertThatCode(writer::close).hasRootCauseExactlyInstanceOf(IOException.class);
+        }
+    }
+
     private void assertKafkaMetricNotPresent(
             DeliveryGuarantee guarantee, String configKey, String configValue) 
throws Exception {
         final Properties config = getKafkaClientConfiguration();

Reply via email to