[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash....
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27564c33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27564c33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27564c33

Branch: refs/heads/release-1.4
Commit: 27564c33955d8e53f0275a5b43d4b2415ba86547
Parents: 736b908
Author: Piotr Nowojski <[email protected]>
Authored: Wed Nov 22 15:53:08 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Nov 23 14:45:07 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 85 ++++++--------------
 .../kafka/FlinkKafkaProducer011ITCase.java      | 65 +++++++++++++++
 2 files changed, 91 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27564c33/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0c741f5..b14e487 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -223,16 +222,11 @@ public class FlinkKafkaProducer011<IN>
        private final int kafkaProducersPoolSize;
 
        /**
-        * Available transactional ids.
+        * Pool of available transactional ids.
         */
        private final BlockingDeque<String> availableTransactionalIds = new 
LinkedBlockingDeque<>();
 
        /**
-        * Pool of KafkaProducers objects.
-        */
-       private transient Optional<ProducersPool> producersPool = 
Optional.empty();
-
-       /**
         * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
         */
        private boolean writeTimestampToKafka = false;
@@ -599,12 +593,6 @@ public class FlinkKafkaProducer011<IN>
                catch (Exception e) {
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
                }
-               try {
-                       producersPool.ifPresent(pool -> pool.close());
-               }
-               catch (Exception e) {
-                       asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
-               }
                // make sure we propagate pending errors
                checkErroneous();
        }
@@ -615,7 +603,7 @@ public class FlinkKafkaProducer011<IN>
        protected KafkaTransactionState beginTransaction() throws 
FlinkKafka011Exception {
                switch (semantic) {
                        case EXACTLY_ONCE:
-                               FlinkKafkaProducer<byte[], byte[]> producer = 
createOrGetProducerFromPool();
+                               FlinkKafkaProducer<byte[], byte[]> producer = 
createTransactionalProducer();
                                producer.beginTransaction();
                                return new 
KafkaTransactionState(producer.getTransactionalId(), producer);
                        case AT_LEAST_ONCE:
@@ -631,21 +619,6 @@ public class FlinkKafkaProducer011<IN>
                }
        }
 
-       private FlinkKafkaProducer<byte[], byte[]> 
createOrGetProducerFromPool() throws FlinkKafka011Exception {
-               FlinkKafkaProducer<byte[], byte[]> producer = 
getProducersPool().poll();
-               if (producer == null) {
-                       String transactionalId = 
availableTransactionalIds.poll();
-                       if (transactionalId == null) {
-                               throw new FlinkKafka011Exception(
-                                       
FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
-                                       "Too many ongoing snapshots. Increase 
kafka producers pool size or decrease number of concurrent checkpoints.");
-                       }
-                       producer = initTransactionalProducer(transactionalId, 
true);
-                       producer.initTransactions();
-               }
-               return producer;
-       }
-
        @Override
        protected void preCommit(KafkaTransactionState transaction) throws 
FlinkKafka011Exception {
                switch (semantic) {
@@ -666,7 +639,7 @@ public class FlinkKafkaProducer011<IN>
                switch (semantic) {
                        case EXACTLY_ONCE:
                                transaction.producer.commitTransaction();
-                               getProducersPool().add(transaction.producer);
+                               
recycleTransactionalProducer(transaction.producer);
                                break;
                        case AT_LEAST_ONCE:
                        case NONE:
@@ -706,7 +679,7 @@ public class FlinkKafkaProducer011<IN>
                switch (semantic) {
                        case EXACTLY_ONCE:
                                transaction.producer.abortTransaction();
-                               getProducersPool().add(transaction.producer);
+                               
recycleTransactionalProducer(transaction.producer);
                                break;
                        case AT_LEAST_ONCE:
                        case NONE:
@@ -796,10 +769,7 @@ public class FlinkKafkaProducer011<IN>
 
                if (semantic != Semantic.EXACTLY_ONCE) {
                        nextTransactionalIdHint = null;
-                       producersPool = Optional.empty();
                } else {
-                       producersPool = Optional.of(new ProducersPool());
-
                        ArrayList<NextTransactionalIdHint> transactionalIdHints 
= Lists.newArrayList(nextTransactionalIdHintState.get());
                        if (transactionalIdHints.size() > 1) {
                                throw new IllegalStateException(
@@ -883,16 +853,33 @@ public class FlinkKafkaProducer011<IN>
                return 
currentTransaction.producer.getTransactionCoordinatorId();
        }
 
+       /**
+        * For each checkpoint we create new {@link FlinkKafkaProducer} so that 
new transactions will not clash
+        * with transactions created during previous checkpoints ({@code 
producer.initTransactions()} assures that we
+        * obtain new producerId and epoch counters).
+        */
+       private FlinkKafkaProducer<byte[], byte[]> 
createTransactionalProducer() throws FlinkKafka011Exception {
+               String transactionalId = availableTransactionalIds.poll();
+               if (transactionalId == null) {
+                       throw new FlinkKafka011Exception(
+                               FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
+                               "Too many ongoing snapshots. Increase kafka 
producers pool size or decrease number of concurrent checkpoints.");
+               }
+               FlinkKafkaProducer<byte[], byte[]> producer = 
initTransactionalProducer(transactionalId, true);
+               producer.initTransactions();
+               return producer;
+       }
+
+       private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], 
byte[]> producer) {
+               availableTransactionalIds.add(producer.getTransactionalId());
+               producer.close();
+       }
+
        private FlinkKafkaProducer<byte[], byte[]> 
initTransactionalProducer(String transactionalId, boolean registerMetrics) {
                producerConfig.put("transactional.id", transactionalId);
                return initProducer(registerMetrics);
        }
 
-       private ProducersPool getProducersPool() {
-               checkState(producersPool.isPresent(), "Trying to access 
uninitialized producer pool");
-               return producersPool.get();
-       }
-
        private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean 
registerMetrics) {
                FlinkKafkaProducer<byte[], byte[]> producer = new 
FlinkKafkaProducer<>(this.producerConfig);
 
@@ -951,7 +938,6 @@ public class FlinkKafkaProducer011<IN>
 
        private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
                in.defaultReadObject();
-               producersPool = Optional.empty();
        }
 
        private static Properties getPropertiesFromBrokerList(String 
brokerList) {
@@ -1264,25 +1250,6 @@ public class FlinkKafkaProducer011<IN>
                }
        }
 
-       static class ProducersPool implements Closeable {
-               private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], 
byte[]>> pool = new LinkedBlockingDeque<>();
-
-               public void add(FlinkKafkaProducer<byte[], byte[]> producer) {
-                       pool.add(producer);
-               }
-
-               public FlinkKafkaProducer<byte[], byte[]> poll() {
-                       return pool.poll();
-               }
-
-               @Override
-               public void close() {
-                       while (!pool.isEmpty()) {
-                               pool.poll().close();
-                       }
-               }
-       }
-
        /**
         * Keep information required to deduce next safe to use transactional 
id.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/27564c33/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 07acd4f..a32c7f8 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -52,6 +52,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -79,6 +80,49 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                extraProperties.put("isolation.level", "read_committed");
        }
 
+       /**
+        * This test ensures that transactions reusing transactional.ids (after 
returning to the pool) will not clash
+        * with previous transactions using same transactional.ids.
+        */
+       @Test(timeout = 120_000L)
+       public void testRestoreToCheckpointAfterExceedingProducersPool() throws 
Exception {
+               String topic = "flink-kafka-producer-fail-before-notify";
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness1 = createTestHarness(topic)) {
+                       testHarness1.setup();
+                       testHarness1.open();
+                       testHarness1.processElement(42, 0);
+                       OperatorStateHandles snapshot = 
testHarness1.snapshot(0, 0);
+                       testHarness1.processElement(43, 0);
+                       testHarness1.notifyOfCompletedCheckpoint(0);
+                       try {
+                               for (int i = 0; i < 
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) {
+                                       testHarness1.snapshot(i + 1, 0);
+                                       testHarness1.processElement(i, 0);
+                               }
+                               throw new IllegalStateException("This should 
not be reached.");
+                       }
+                       catch (Exception ex) {
+                               
assertIsCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex);
+                       }
+
+                       // Resume transactions before testHrness1 is being 
closed (in case of failures close() might not be called)
+                       try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness2 = createTestHarness(topic)) {
+                               testHarness2.setup();
+                               // restore from snapshot1, transactions with 
records 43 and 44 should be aborted
+                               testHarness2.initializeState(snapshot);
+                               testHarness2.open();
+                       }
+
+                       assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42), 30_000L);
+                       deleteTestTopic(topic);
+               }
+               catch (Exception ex) {
+                       // testHarness1 will be fenced off after creating and 
closing testHarness2
+                       assertIsCausedBy(ProducerFencedException.class, ex);
+               }
+       }
+
        @Test(timeout = 120_000L)
        public void testFlinkKafkaProducer011FailBeforeNotify() throws 
Exception {
                String topic = "flink-kafka-producer-fail-before-notify";
@@ -563,4 +607,25 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, 
"true");
                return properties;
        }
+
+       private void assertIsCausedBy(Class<?> clazz, Throwable ex) {
+               for (int depth = 0; depth < 50 && ex != null; depth++) {
+                       if (clazz.isInstance(ex)) {
+                               return;
+                       }
+                       ex = ex.getCause();
+               }
+               fail(String.format("Exception [%s] was not caused by [%s]", ex, 
clazz));
+       }
+
+       private void assertIsCausedBy(FlinkKafka011ErrorCode expectedErrorCode, 
Throwable ex) {
+               for (int depth = 0; depth < 50 && ex != null; depth++) {
+                       if (ex instanceof FlinkKafka011Exception) {
+                               assertEquals(expectedErrorCode, 
((FlinkKafka011Exception) ex).getErrorCode());
+                               return;
+                       }
+                       ex = ex.getCause();
+               }
+               fail(String.format("Exception [%s] was not caused by 
FlinkKafka011Exception[errorCode=%s]", ex, expectedErrorCode));
+       }
 }

Reply via email to