Repository: flink Updated Branches: refs/heads/master 7c63526ad -> d57ebd063
[FLINK-8086][kafka] Ignore ProducerFencedException during recovery ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d57ebd06 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d57ebd06 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d57ebd06 Branch: refs/heads/master Commit: d57ebd063bad571d0ea276da5beee18aeb568b50 Parents: 34120ef Author: Piotr Nowojski <[email protected]> Authored: Fri Nov 17 14:40:30 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 20 13:24:50 2017 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer011.java | 20 ++++----- .../kafka/FlinkKafkaProducer011ITCase.java | 47 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d57ebd06/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 611a3d5..6b0136d 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -59,6 +59,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -678,14 +679,12 @@ public class FlinkKafkaProducer011<IN> protected void recoverAndCommit(KafkaTransactionState transaction) { switch (semantic) { case EXACTLY_ONCE: - FlinkKafkaProducer<byte[], byte[]> producer = - initTransactionalProducer(transaction.transactionalId, false); - producer.resumeTransaction(transaction.producerId, transaction.epoch); - try { + try (FlinkKafkaProducer<byte[], byte[]> producer = + initTransactionalProducer(transaction.transactionalId, false)) { + producer.resumeTransaction(transaction.producerId, transaction.epoch); producer.commitTransaction(); - producer.close(); } - catch (InvalidTxnStateException ex) { + catch (InvalidTxnStateException | ProducerFencedException ex) { // That means we have committed this transaction before. LOG.warn("Encountered error {} while recovering transaction {}. " + "Presumably this transaction has been already committed before", @@ -720,11 +719,10 @@ public class FlinkKafkaProducer011<IN> protected void recoverAndAbort(KafkaTransactionState transaction) { switch (semantic) { case EXACTLY_ONCE: - FlinkKafkaProducer<byte[], byte[]> producer = - initTransactionalProducer(transaction.transactionalId, false); - producer.resumeTransaction(transaction.producerId, transaction.epoch); - producer.abortTransaction(); - producer.close(); + try (FlinkKafkaProducer<byte[], byte[]> producer = + initTransactionalProducer(transaction.transactionalId, false)) { + producer.initTransactions(); + } break; case AT_LEAST_ONCE: case NONE: http://git-wip-us.apache.org/repos/asf/flink/blob/d57ebd06/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 922344d..07acd4f 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -219,6 +219,53 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { deleteTestTopic(topic); } + @Test(timeout = 120_000L) + public void testFailAndRecoverSameCheckpointTwice() throws Exception { + String topic = "flink-kafka-producer-fail-and-recover-same-checkpoint-twice"; + + OperatorStateHandles snapshot1; + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + testHarness.setup(); + testHarness.open(); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + snapshot1 = testHarness.snapshot(1, 3); + + testHarness.processElement(44, 4); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(44, 7); + testHarness.snapshot(2, 8); + testHarness.processElement(45, 9); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(44, 7); + testHarness.snapshot(3, 8); + testHarness.processElement(45, 9); + } + + //now we should have: + // - records 42 and 43 in committed transactions + // - aborted transactions with records 44 and 45 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + deleteTestTopic(topic); + } + /** * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure, * which happened before first checkpoint and was followed up by reducing the parallelism.
