This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c8ee1b118c625c7653a45c9e7adb9f1def6ea2b7 Author: Arvid Heise <[email protected]> AuthorDate: Sun Sep 5 23:19:46 2021 +0200 [FLINK-24131][connectors/kafka] Improve handling of committer errors in KafkaCommitter. --- .../flink/connector/kafka/sink/KafkaCommitter.java | 53 ++++++++++++------ .../flink/connector/kafka/sink/Recyclable.java | 6 ++- .../connector/kafka/sink/KafkaCommitterTest.java | 62 +++++++++++++++++++--- 3 files changed, 96 insertions(+), 25 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java index 8b27980..b511126 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java @@ -18,16 +18,18 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.connector.sink.Committer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -39,9 +41,12 @@ import java.util.Properties; * * <p>The committer is responsible to finalize the Kafka transactions by committing them. */ -class KafkaCommitter implements Committer<KafkaCommittable> { +class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class); + public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE = + "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n" + + "To avoid data loss, the application will restart."; private final Properties kafkaProducerConfig; @@ -66,17 +71,16 @@ class KafkaCommitter implements Committer<KafkaCommittable> { .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject) .orElseGet(() -> getRecoveryProducer(committable)); producer.commitTransaction(); - recyclable.ifPresent(Recyclable::close); - } catch (InvalidTxnStateException e) { + producer.flush(); + } catch (RetriableException e) { LOG.warn( - "Unable to commit recovered transaction ({}) because it's in an invalid state. " - + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", - committable, - e); - recyclable.ifPresent(Recyclable::close); + "Encountered retriable exception while committing {}.", transactionalId, e); + retryableCommittables.add(committable); + continue; } catch (ProducerFencedException e) { - LOG.warn( - "Unable to commit recovered transaction ({}) because its producer is already fenced." + // initTransaction has been called on this transaction before + LOG.error( + "Unable to commit transaction ({}) because its producer is already fenced." + " This means that you either have a different producer with the same '{}' (this is" + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)" + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," @@ -85,19 +89,34 @@ class KafkaCommitter implements Committer<KafkaCommittable> { ProducerConfig.TRANSACTIONAL_ID_CONFIG, KafkaSink.class.getSimpleName(), ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, - FlinkKafkaProducer.getTransactionTimeout(kafkaProducerConfig), + kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), + e); + } catch (InvalidTxnStateException e) { + // This exception only occurs when aborting after a commit or vice versa. + // It does not appear on double commits or double aborts. + LOG.error( + "Unable to commit transaction ({}) because it's in an invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", + committable, + e); + } catch (UnknownProducerIdException e) { + LOG.error( + "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE, + committable, + e); + } catch (Exception e) { + LOG.error( + "Transaction ({}) encountered error and data has been potentially lost.", + committable, e); - recyclable.ifPresent(Recyclable::close); - } catch (Throwable e) { - LOG.warn("Cannot commit Kafka transaction, retrying.", e); - retryableCommittables.add(committable); } + recyclable.ifPresent(Recyclable::close); } return retryableCommittables; } @Override - public void close() throws Exception { + public void close() { if (recoveryProducer != null) { recoveryProducer.close(); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java index 163592a..012fa99 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java @@ -33,10 +33,14 @@ class Recyclable<T> implements Closeable { } public T getObject() { - checkState(object != null, "Already recycled"); + checkState(!isRecycled(), "Already recycled"); return object; } + boolean isRecycled() { + return object == null; + } + @Override public void close() { recycler.accept(object); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java index 62f0cec..6ce4f24 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java @@ -19,6 +19,9 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.util.TestLogger; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import java.io.IOException; @@ -26,17 +29,62 @@ import java.util.Collections; import java.util.List; import java.util.Properties; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; /** Tests for {@link KafkaCommitter}. */ public class KafkaCommitterTest extends TestLogger { + private static final int PRODUCER_ID = 0; + private static final short EPOCH = 0; + private static final String TRANSACTIONAL_ID = "transactionalId"; + + /** Causes a network error by inactive broker and tests that a retry will happen. */ + @Test + public void testRetryCommittableOnRetriableError() throws IOException { + Properties properties = getProperties(); + try (final KafkaCommitter committer = new KafkaCommitter(properties); + FlinkKafkaInternalProducer<Object, Object> producer = + new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID); + Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable = + new Recyclable<>(producer, p -> {})) { + final List<KafkaCommittable> committables = + Collections.singletonList( + new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable)); + producer.resumeTransaction(PRODUCER_ID, EPOCH); + List<KafkaCommittable> recovered = committer.commit(committables); + assertThat(recovered, contains(committables.toArray())); + assertThat(recyclable.isRecycled(), equalTo(false)); + } + } + @Test - public void testRetryCommittableOnFailure() throws IOException { - final KafkaCommitter committer = new KafkaCommitter(new Properties()); - final short epoch = 0; - final List<KafkaCommittable> committables = - Collections.singletonList(new KafkaCommittable(0, epoch, "transactionalId", null)); - assertEquals(committables, committer.commit(committables)); + public void testRetryCommittableOnFatalError() throws IOException { + Properties properties = getProperties(); + try (final KafkaCommitter committer = new KafkaCommitter(properties); + FlinkKafkaInternalProducer<Object, Object> producer = + new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID); + Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable = + new Recyclable<>(producer, p -> {})) { + final List<KafkaCommittable> committables = + Collections.singletonList( + new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable)); + // will fail because transaction not started + List<KafkaCommittable> recovered = committer.commit(committables); + assertThat(recovered, empty()); + assertThat(recyclable.isRecycled(), equalTo(true)); + } + } + + Properties getProperties() { + Properties properties = new Properties(); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:1"); + // Low timeout will fail commitTransaction quicker + properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return properties; } }
