Repository: flink
Updated Branches:
  refs/heads/master 7c63526ad -> d57ebd063


[FLINK-8086][kafka] Ignore ProducerFencedException during recovery

ProducerFencedException can happen if we restore twice from the same checkpoint
or if we restore from an old savepoint. In both cases transactional.ids that we
want to recoverAndCommit have been already committed and reused. Reusing mean
that they will be known by Kafka's brokers under newer producerId/epochId,
which will result in ProducerFencedException if we try to commit again some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of failing
we might have a semi silent (with a warning) data loss.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d57ebd06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d57ebd06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d57ebd06

Branch: refs/heads/master
Commit: d57ebd063bad571d0ea276da5beee18aeb568b50
Parents: 34120ef
Author: Piotr Nowojski <[email protected]>
Authored: Fri Nov 17 14:40:30 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 20 13:24:50 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 20 ++++-----
 .../kafka/FlinkKafkaProducer011ITCase.java      | 47 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d57ebd06/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 611a3d5..6b0136d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -678,14 +679,12 @@ public class FlinkKafkaProducer011<IN>
        protected void recoverAndCommit(KafkaTransactionState transaction) {
                switch (semantic) {
                        case EXACTLY_ONCE:
-                               FlinkKafkaProducer<byte[], byte[]> producer =
-                                       
initTransactionalProducer(transaction.transactionalId, false);
-                               
producer.resumeTransaction(transaction.producerId, transaction.epoch);
-                               try {
+                               try (FlinkKafkaProducer<byte[], byte[]> 
producer =
+                                               
initTransactionalProducer(transaction.transactionalId, false)) {
+                                       
producer.resumeTransaction(transaction.producerId, transaction.epoch);
                                        producer.commitTransaction();
-                                       producer.close();
                                }
-                               catch (InvalidTxnStateException ex) {
+                               catch (InvalidTxnStateException | 
ProducerFencedException ex) {
                                        // That means we have committed this 
transaction before.
                                        LOG.warn("Encountered error {} while 
recovering transaction {}. " +
                                                "Presumably this transaction 
has been already committed before",
@@ -720,11 +719,10 @@ public class FlinkKafkaProducer011<IN>
        protected void recoverAndAbort(KafkaTransactionState transaction) {
                switch (semantic) {
                        case EXACTLY_ONCE:
-                               FlinkKafkaProducer<byte[], byte[]> producer =
-                                       
initTransactionalProducer(transaction.transactionalId, false);
-                               
producer.resumeTransaction(transaction.producerId, transaction.epoch);
-                               producer.abortTransaction();
-                               producer.close();
+                               try (FlinkKafkaProducer<byte[], byte[]> 
producer =
+                                               
initTransactionalProducer(transaction.transactionalId, false)) {
+                                       producer.initTransactions();
+                               }
                                break;
                        case AT_LEAST_ONCE:
                        case NONE:

http://git-wip-us.apache.org/repos/asf/flink/blob/d57ebd06/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 922344d..07acd4f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -219,6 +219,53 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                deleteTestTopic(topic);
        }
 
+       @Test(timeout = 120_000L)
+       public void testFailAndRecoverSameCheckpointTwice() throws Exception {
+               String topic = 
"flink-kafka-producer-fail-and-recover-same-checkpoint-twice";
+
+               OperatorStateHandles snapshot1;
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+                       testHarness.setup();
+                       testHarness.open();
+                       testHarness.processElement(42, 0);
+                       testHarness.snapshot(0, 1);
+                       testHarness.processElement(43, 2);
+                       snapshot1 = testHarness.snapshot(1, 3);
+
+                       testHarness.processElement(44, 4);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+                       testHarness.setup();
+                       // restore from snapshot1, transactions with records 44 
and 45 should be aborted
+                       testHarness.initializeState(snapshot1);
+                       testHarness.open();
+
+                       // write and commit more records, after potentially 
lingering transactions
+                       testHarness.processElement(44, 7);
+                       testHarness.snapshot(2, 8);
+                       testHarness.processElement(45, 9);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+                       testHarness.setup();
+                       // restore from snapshot1, transactions with records 44 
and 45 should be aborted
+                       testHarness.initializeState(snapshot1);
+                       testHarness.open();
+
+                       // write and commit more records, after potentially 
lingering transactions
+                       testHarness.processElement(44, 7);
+                       testHarness.snapshot(3, 8);
+                       testHarness.processElement(45, 9);
+               }
+
+               //now we should have:
+               // - records 42 and 43 in committed transactions
+               // - aborted transactions with records 44 and 45
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43), 30_000L);
+               deleteTestTopic(topic);
+       }
+
        /**
         * This tests checks whether FlinkKafkaProducer011 correctly aborts 
lingering transactions after a failure,
         * which happened before first checkpoint and was followed up by 
reducing the parallelism.

Reply via email to