This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit dd09fa9a3d05c7096c85cb14f2a792a66a915547
Author: mas-chen <[email protected]>
AuthorDate: Wed Mar 29 16:40:22 2023 -0700

    [FLINK-31305] fix error propagation bug in WriterCallback and use 
TestSinkInitContext general purpose sink testing tool
    
    This closes #22303.
---
 flink-connector-kafka/pom.xml                      |   8 +
 .../flink/connector/kafka/sink/KafkaWriter.java    |  23 +--
 .../connector/kafka/sink/KafkaWriterITCase.java    | 190 +++++++++++++++------
 3 files changed, 149 insertions(+), 72 deletions(-)

diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index 0355775c..23598f41 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -180,6 +180,14 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-metrics-jmx</artifactId>
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index adc05b8d..0df3bcf8 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -271,16 +271,6 @@ class KafkaWriter<IN>
         return currentProducer;
     }
 
-    @VisibleForTesting
-    Exception getAsyncProducerException() {
-        return asyncProducerException;
-    }
-
-    @VisibleForTesting
-    void setAsyncProducerException(Exception asyncProducerException) {
-        this.asyncProducerException = asyncProducerException;
-    }
-
     void abortLingeringTransactions(
             Collection<KafkaWriterState> recoveredStates, long 
startCheckpointId) {
         List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
@@ -414,13 +404,17 @@ class KafkaWriter<IN>
                 });
     }
 
-    /** This logic needs to be invoked by write AND flush since we support 
various semantics. */
+    /**
+     * This method should only be invoked in the mailbox thread since the 
counter is not volatile.
+     * Logic needs to be invoked by write AND flush since we support various 
semantics.
+     */
     private void checkAsyncException() throws IOException {
         // reset this exception since we could close the writer later on
         Exception e = asyncProducerException;
         if (e != null) {
 
             asyncProducerException = null;
+            numRecordsOutErrorsCounter.inc();
             throw new IOException(
                     "One or more Kafka Producer send requests have encountered 
exception", e);
         }
@@ -448,17 +442,12 @@ class KafkaWriter<IN>
                 // complete. The same guarantee does not hold for tasks 
executed in separate
                 // executor e.g. mailbox executor. flush() needs to have the 
exception immediately
                 // available to fail the checkpoint.
-                if (asyncProducerException != null) {
+                if (asyncProducerException == null) {
                     asyncProducerException = decorateException(metadata, 
exception, producer);
                 }
 
                 mailboxExecutor.submit(
                         () -> {
-                            // Need to send metrics through mailbox thread 
since we are in the
-                            // producer io
-                            // thread
-                            numRecordsOutErrorsCounter.inc();
-
                             // Checking for exceptions from previous writes
                             checkAsyncException();
                         },
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index e7627b26..c9d226d1 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -17,20 +17,18 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLoggerExtension;
@@ -41,6 +39,7 @@ import 
org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -198,39 +197,135 @@ public class KafkaWriterITCase {
     }
 
     @Test
-    void testNumRecordsOutErrorsCounterMetric() throws Exception {
+    void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
         Properties properties = getKafkaClientConfiguration();
-        final InternalSinkWriterMetricGroup metricGroup =
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
 
-        try (final KafkaWriter<Integer> writer =
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
-                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup)) {
-            final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
-            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+
+        triggerProducerException(writer, properties);
+
+        // test flush
+        assertThatCode(() -> writer.flush(false))
+                .hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .as("the exception is not thrown again")
+                .doesNotThrowAnyException();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+    }
 
-            writer.write(1, SINK_WRITER_CONTEXT);
-            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+    @Test
+    void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
 
-            final String transactionalId = 
writer.getCurrentProducer().getTransactionalId();
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+
+        triggerProducerException(writer, properties);
+        // to ensure that the exceptional send request has completed
+        writer.getCurrentProducer().flush();
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .as("the exception is not thrown again")
+                .doesNotThrowAnyException();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+    }
 
-            try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                    new FlinkKafkaInternalProducer<>(properties, 
transactionalId)) {
+    @Test
+    void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
 
-                producer.initTransactions();
-                producer.beginTransaction();
-                producer.send(new ProducerRecord<byte[], byte[]>(topic, 
"2".getBytes()));
-                producer.commitTransaction();
-            }
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
-            writer.write(3, SINK_WRITER_CONTEXT);
-            // this doesn't throw exception because the exception is thrown in 
the Producer IO
-            // thread in unit tests due to the mock mailbox executor, while it 
would be thrown in
-            // flush() when the real mailbox executor is configured
-            writer.flush(false);
-            writer.prepareCommit();
-            assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+        triggerProducerException(writer, properties);
+        // to ensure that the exceptional send request has completed
+        writer.getCurrentProducer().flush();
+
+        while (sinkInitContext.getMailboxExecutor().tryYield()) {
+            // execute all mails
+        }
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+
+        assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
+                .as("the exception is not thrown again")
+                .doesNotThrowAnyException();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+    }
+
+    @Test
+    void testCloseAsyncErrorPropagationAndErrorCounter() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
+
+        SinkInitContext sinkInitContext =
+                new SinkInitContext(
+                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        timeService,
+                        null);
+        final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
+        final Counter numRecordsOutErrors =
+                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
+
+        triggerProducerException(writer, properties);
+        // to ensure that the exceptional send request has completed
+        writer.getCurrentProducer().flush();
+
+        // test flush
+        assertThatCode(writer::close)
+                .as("flush should throw the exception from the WriterCallback")
+                .hasRootCauseExactlyInstanceOf(ProducerFencedException.class);
+        assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
+    }
+
+    private void triggerProducerException(KafkaWriter<Integer> writer, 
Properties properties)
+            throws IOException {
+        final String transactionalId = 
writer.getCurrentProducer().getTransactionalId();
+
+        try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                new FlinkKafkaInternalProducer<>(properties, transactionalId)) 
{
+            producer.initTransactions();
+            producer.beginTransaction();
+            producer.send(new ProducerRecord<byte[], byte[]>(topic, 
"1".getBytes()));
+            producer.commitTransaction();
         }
+
+        writer.write(1, SINK_WRITER_CONTEXT);
     }
 
     @Test
@@ -387,28 +482,6 @@ public class KafkaWriterITCase {
         }
     }
 
-    @Test
-    public void testErrorPropagation() {
-        Properties properties = getKafkaClientConfiguration();
-        final KafkaWriter<Integer> writer =
-                createWriterWithConfiguration(properties, 
DeliveryGuarantee.AT_LEAST_ONCE);
-        try {
-            writer.setAsyncProducerException(
-                    new IOException("previous send request encountered 
error."));
-            assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
-                    .hasRootCauseExactlyInstanceOf(IOException.class);
-
-            writer.setAsyncProducerException(
-                    new IOException("previous send request encountered 
error."));
-            assertThatCode(() -> writer.flush(false))
-                    .hasRootCauseExactlyInstanceOf(IOException.class);
-        } finally {
-            writer.setAsyncProducerException(
-                    new IOException("previous send request encountered 
error."));
-            
assertThatCode(writer::close).hasRootCauseExactlyInstanceOf(IOException.class);
-        }
-    }
-
     private void assertKafkaMetricNotPresent(
             DeliveryGuarantee guarantee, String configKey, String configValue) 
throws Exception {
         final Properties config = getKafkaClientConfiguration();
@@ -449,6 +522,18 @@ public class KafkaWriterITCase {
                 ImmutableList.of());
     }
 
+    private KafkaWriter<Integer> createWriterWithConfiguration(
+            Properties config, DeliveryGuarantee guarantee, SinkInitContext 
sinkInitContext) {
+        return new KafkaWriter<>(
+                guarantee,
+                config,
+                "test-prefix",
+                sinkInitContext,
+                new DummyRecordSerializer(),
+                new DummySchemaContext(),
+                ImmutableList.of());
+    }
+
     private static Properties getKafkaClientConfiguration() {
         final Properties standardProps = new Properties();
         standardProps.put("bootstrap.servers", 
KAFKA_CONTAINER.getBootstrapServers());
@@ -460,7 +545,7 @@ public class KafkaWriterITCase {
         return standardProps;
     }
 
-    private static class SinkInitContext implements Sink.InitContext {
+    private static class SinkInitContext extends TestSinkInitContext {
 
         private final SinkWriterMetricGroup metricGroup;
         private final ProcessingTimeService timeService;
@@ -480,11 +565,6 @@ public class KafkaWriterITCase {
             throw new UnsupportedOperationException("Not implemented.");
         }
 
-        @Override
-        public MailboxExecutor getMailboxExecutor() {
-            return new SyncMailboxExecutor();
-        }
-
         @Override
         public ProcessingTimeService getProcessingTimeService() {
             return timeService;

Reply via email to