This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit dd09fa9a3d05c7096c85cb14f2a792a66a915547 Author: mas-chen <[email protected]> AuthorDate: Wed Mar 29 16:40:22 2023 -0700 [FLINK-31305] fix error propagation bug in WriterCallback and use TestSinkInitContext general purpose sink testing tool This closes #22303. --- flink-connector-kafka/pom.xml | 8 + .../flink/connector/kafka/sink/KafkaWriter.java | 23 +-- .../connector/kafka/sink/KafkaWriterITCase.java | 190 +++++++++++++++------ 3 files changed, 149 insertions(+), 72 deletions(-) diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 0355775c..23598f41 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -180,6 +180,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-jmx</artifactId> diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index adc05b8d..0df3bcf8 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -271,16 +271,6 @@ class KafkaWriter<IN> return currentProducer; } - @VisibleForTesting - Exception getAsyncProducerException() { - return asyncProducerException; - } - - @VisibleForTesting - void setAsyncProducerException(Exception asyncProducerException) { - this.asyncProducerException = asyncProducerException; - } - void abortLingeringTransactions( Collection<KafkaWriterState> recoveredStates, long startCheckpointId) { List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); @@ -414,13 +404,17 @@ class KafkaWriter<IN> }); } - /** This logic needs to be invoked by write AND flush since we support various semantics. */ + /** + * This method should only be invoked in the mailbox thread since the counter is not volatile. + * Logic needs to be invoked by write AND flush since we support various semantics. + */ private void checkAsyncException() throws IOException { // reset this exception since we could close the writer later on Exception e = asyncProducerException; if (e != null) { asyncProducerException = null; + numRecordsOutErrorsCounter.inc(); throw new IOException( "One or more Kafka Producer send requests have encountered exception", e); } @@ -448,17 +442,12 @@ class KafkaWriter<IN> // complete. The same guarantee does not hold for tasks executed in separate // executor e.g. mailbox executor. flush() needs to have the exception immediately // available to fail the checkpoint. - if (asyncProducerException != null) { + if (asyncProducerException == null) { asyncProducerException = decorateException(metadata, exception, producer); } mailboxExecutor.submit( () -> { - // Need to send metrics through mailbox thread since we are in the - // producer io - // thread - numRecordsOutErrorsCounter.inc(); - // Checking for exceptions from previous writes checkAsyncException(); }, diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index e7627b26..c9d226d1 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -17,20 +17,18 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; -import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.TestLoggerExtension; @@ -41,6 +39,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -198,39 +197,135 @@ public class KafkaWriterITCase { } @Test - void testNumRecordsOutErrorsCounterMetric() throws Exception { + void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); - final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); - try (final KafkaWriter<Integer> writer = + SinkInitContext sinkInitContext = + new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + timeService, + null); + final KafkaWriter<Integer> writer = createWriterWithConfiguration( - properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) { - final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); - assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); + final Counter numRecordsOutErrors = + sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + + triggerProducerException(writer, properties); + + // test flush + assertThatCode(() -> writer.flush(false)) + .hasRootCauseExactlyInstanceOf(ProducerFencedException.class); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + + assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) + .as("the exception is not thrown again") + .doesNotThrowAnyException(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + } - writer.write(1, SINK_WRITER_CONTEXT); - assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + @Test + void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception { + Properties properties = getKafkaClientConfiguration(); - final String transactionalId = writer.getCurrentProducer().getTransactionalId(); + SinkInitContext sinkInitContext = + new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + timeService, + null); + final KafkaWriter<Integer> writer = + createWriterWithConfiguration( + properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); + final Counter numRecordsOutErrors = + sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + + triggerProducerException(writer, properties); + // to ensure that the exceptional send request has completed + writer.getCurrentProducer().flush(); + + assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) + .hasRootCauseExactlyInstanceOf(ProducerFencedException.class); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + + assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) + .as("the exception is not thrown again") + .doesNotThrowAnyException(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + } - try (FlinkKafkaInternalProducer<byte[], byte[]> producer = - new FlinkKafkaInternalProducer<>(properties, transactionalId)) { + @Test + void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception { + Properties properties = getKafkaClientConfiguration(); - producer.initTransactions(); - producer.beginTransaction(); - producer.send(new ProducerRecord<byte[], byte[]>(topic, "2".getBytes())); - producer.commitTransaction(); - } + SinkInitContext sinkInitContext = + new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + timeService, + null); + final KafkaWriter<Integer> writer = + createWriterWithConfiguration( + properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); + final Counter numRecordsOutErrors = + sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); - writer.write(3, SINK_WRITER_CONTEXT); - // this doesn't throw exception because the exception is thrown in the Producer IO - // thread in unit tests due to the mock mailbox executor, while it would be thrown in - // flush() when the real mailbox executor is configured - writer.flush(false); - writer.prepareCommit(); - assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + triggerProducerException(writer, properties); + // to ensure that the exceptional send request has completed + writer.getCurrentProducer().flush(); + + while (sinkInitContext.getMailboxExecutor().tryYield()) { + // execute all mails + } + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + + assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) + .as("the exception is not thrown again") + .doesNotThrowAnyException(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + } + + @Test + void testCloseAsyncErrorPropagationAndErrorCounter() throws Exception { + Properties properties = getKafkaClientConfiguration(); + + SinkInitContext sinkInitContext = + new SinkInitContext( + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + timeService, + null); + final KafkaWriter<Integer> writer = + createWriterWithConfiguration( + properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); + final Counter numRecordsOutErrors = + sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); + + triggerProducerException(writer, properties); + // to ensure that the exceptional send request has completed + writer.getCurrentProducer().flush(); + + // test flush + assertThatCode(writer::close) + .as("flush should throw the exception from the WriterCallback") + .hasRootCauseExactlyInstanceOf(ProducerFencedException.class); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); + } + + private void triggerProducerException(KafkaWriter<Integer> writer, Properties properties) + throws IOException { + final String transactionalId = writer.getCurrentProducer().getTransactionalId(); + + try (FlinkKafkaInternalProducer<byte[], byte[]> producer = + new FlinkKafkaInternalProducer<>(properties, transactionalId)) { + producer.initTransactions(); + producer.beginTransaction(); + producer.send(new ProducerRecord<byte[], byte[]>(topic, "1".getBytes())); + producer.commitTransaction(); } + + writer.write(1, SINK_WRITER_CONTEXT); } @Test @@ -387,28 +482,6 @@ public class KafkaWriterITCase { } } - @Test - public void testErrorPropagation() { - Properties properties = getKafkaClientConfiguration(); - final KafkaWriter<Integer> writer = - createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE); - try { - writer.setAsyncProducerException( - new IOException("previous send request encountered error.")); - assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT)) - .hasRootCauseExactlyInstanceOf(IOException.class); - - writer.setAsyncProducerException( - new IOException("previous send request encountered error.")); - assertThatCode(() -> writer.flush(false)) - .hasRootCauseExactlyInstanceOf(IOException.class); - } finally { - writer.setAsyncProducerException( - new IOException("previous send request encountered error.")); - assertThatCode(writer::close).hasRootCauseExactlyInstanceOf(IOException.class); - } - } - private void assertKafkaMetricNotPresent( DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { final Properties config = getKafkaClientConfiguration(); @@ -449,6 +522,18 @@ public class KafkaWriterITCase { ImmutableList.of()); } + private KafkaWriter<Integer> createWriterWithConfiguration( + Properties config, DeliveryGuarantee guarantee, SinkInitContext sinkInitContext) { + return new KafkaWriter<>( + guarantee, + config, + "test-prefix", + sinkInitContext, + new DummyRecordSerializer(), + new DummySchemaContext(), + ImmutableList.of()); + } + private static Properties getKafkaClientConfiguration() { final Properties standardProps = new Properties(); standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); @@ -460,7 +545,7 @@ public class KafkaWriterITCase { return standardProps; } - private static class SinkInitContext implements Sink.InitContext { + private static class SinkInitContext extends TestSinkInitContext { private final SinkWriterMetricGroup metricGroup; private final ProcessingTimeService timeService; @@ -480,11 +565,6 @@ public class KafkaWriterITCase { throw new UnsupportedOperationException("Not implemented."); } - @Override - public MailboxExecutor getMailboxExecutor() { - return new SyncMailboxExecutor(); - } - @Override public ProcessingTimeService getProcessingTimeService() { return timeService;
