This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 7e3c5a16458cced88360fb318e5b6e8b368a79d1 Author: mas-chen <[email protected]> AuthorDate: Fri Mar 10 00:42:21 2023 -0800 [FLINK-31305] Propagate producer exceptions outside of mailbox executor so that checkpoints can correctly fail This closes #19. --- .../flink/connector/kafka/sink/KafkaWriter.java | 58 +++++++++++++++++++--- .../connector/kafka/sink/KafkaWriterITCase.java | 26 ++++++++++ 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index ba2cb4e4..adc05b8d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -89,7 +89,7 @@ class KafkaWriter<IN> private final KafkaRecordSerializationSchema<IN> recordSerializer; private final Callback deliveryCallback; private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext; - + private volatile Exception asyncProducerException; private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>(); private final SinkWriterMetricGroup metricGroup; private final boolean disabledMetrics; @@ -139,6 +139,7 @@ class KafkaWriter<IN> this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig"); this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer"); + checkNotNull(sinkInitContext, "sinkInitContext"); this.deliveryCallback = new WriterCallback( sinkInitContext.getMailboxExecutor(), @@ -150,7 +151,6 @@ class KafkaWriter<IN> || kafkaProducerConfig.containsKey(KEY_REGISTER_METRICS) && !Boolean.parseBoolean( kafkaProducerConfig.get(KEY_REGISTER_METRICS).toString()); - checkNotNull(sinkInitContext, "sinkInitContext"); this.timeService = sinkInitContext.getProcessingTimeService(); this.metricGroup = sinkInitContext.metricGroup(); this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); @@ -192,6 +192,8 @@ class KafkaWriter<IN> @Override public void write(@Nullable IN element, Context context) throws IOException { + checkAsyncException(); + final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); if (record != null) { @@ -206,6 +208,8 @@ class KafkaWriter<IN> LOG.debug("final flush={}", endOfInput); currentProducer.flush(); } + + checkAsyncException(); } @Override @@ -241,6 +245,9 @@ class KafkaWriter<IN> checkState(currentProducer.isClosed()); currentProducer = null; }); + + // Rethrow exception for the case in which close is called before writer() and flush(). + checkAsyncException(); } private void abortCurrentProducer() { @@ -264,6 +271,16 @@ class KafkaWriter<IN> return currentProducer; } + @VisibleForTesting + Exception getAsyncProducerException() { + return asyncProducerException; + } + + @VisibleForTesting + void setAsyncProducerException(Exception asyncProducerException) { + this.asyncProducerException = asyncProducerException; + } + void abortLingeringTransactions( Collection<KafkaWriterState> recoveredStates, long startCheckpointId) { List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); @@ -397,6 +414,18 @@ class KafkaWriter<IN> }); } + /** This logic needs to be invoked by write AND flush since we support various semantics. */ + private void checkAsyncException() throws IOException { + // reset this exception since we could close the writer later on + Exception e = asyncProducerException; + if (e != null) { + + asyncProducerException = null; + throw new IOException( + "One or more Kafka Producer send requests have encountered exception", e); + } + } + private class WriterCallback implements Callback { private final MailboxExecutor mailboxExecutor; @Nullable private final Consumer<RecordMetadata> metadataConsumer; @@ -413,12 +442,27 @@ class KafkaWriter<IN> if (exception != null) { FlinkKafkaInternalProducer<byte[], byte[]> producer = KafkaWriter.this.currentProducer; - mailboxExecutor.execute( + + // Propagate the first exception since amount of exceptions could be large. Need to + // do this in Producer IO thread since flush() guarantees that the future will + // complete. The same guarantee does not hold for tasks executed in separate + // executor e.g. mailbox executor. flush() needs to have the exception immediately + // available to fail the checkpoint. + if (asyncProducerException != null) { + asyncProducerException = decorateException(metadata, exception, producer); + } + + mailboxExecutor.submit( () -> { + // Need to send metrics through mailbox thread since we are in the + // producer io + // thread numRecordsOutErrorsCounter.inc(); - throwException(metadata, exception, producer); + + // Checking for exceptions from previous writes + checkAsyncException(); }, - "Failed to send data to Kafka"); + "Update error metric"); } if (metadataConsumer != null) { @@ -426,7 +470,7 @@ class KafkaWriter<IN> } } - private void throwException( + private FlinkRuntimeException decorateException( RecordMetadata metadata, Exception exception, FlinkKafkaInternalProducer<byte[], byte[]> producer) { @@ -435,7 +479,7 @@ class KafkaWriter<IN> if (exception instanceof UnknownProducerIdException) { message += KafkaCommitter.UNKNOWN_PRODUCER_ID_ERROR_MESSAGE; } - throw new FlinkRuntimeException(message, exception); + return new FlinkRuntimeException(message, exception); } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 98f64ed1..e7627b26 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -75,6 +75,7 @@ import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaCo import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic; import static org.apache.flink.util.DockerImageVersions.KAFKA; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; /** Tests for the standalone KafkaWriter. */ @ExtendWith(TestLoggerExtension.class) @@ -223,6 +224,9 @@ public class KafkaWriterITCase { } writer.write(3, SINK_WRITER_CONTEXT); + // this doesn't throw exception because the exception is thrown in the Producer IO + // thread in unit tests due to the mock mailbox executor, while it would be thrown in + // flush() when the real mailbox executor is configured writer.flush(false); writer.prepareCommit(); assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); @@ -383,6 +387,28 @@ public class KafkaWriterITCase { } } + @Test + public void testErrorPropagation() { + Properties properties = getKafkaClientConfiguration(); + final KafkaWriter<Integer> writer = + createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE); + try { + writer.setAsyncProducerException( + new IOException("previous send request encountered error.")); + assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) + .hasRootCauseExactlyInstanceOf(IOException.class); + + writer.setAsyncProducerException( + new IOException("previous send request encountered error.")); + assertThatCode(() -> writer.flush(false)) + .hasRootCauseExactlyInstanceOf(IOException.class); + } finally { + writer.setAsyncProducerException( + new IOException("previous send request encountered error.")); + assertThatCode(writer::close).hasRootCauseExactlyInstanceOf(IOException.class); + } + } + private void assertKafkaMetricNotPresent( DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { final Properties config = getKafkaClientConfiguration();
