[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c123e40 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c123e40 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c123e40 Branch: refs/heads/master Commit: 1c123e40e564b097fde22da648679963c40bdfe3 Parents: dc2ef4f Author: Piotr Nowojski <[email protected]> Authored: Tue Oct 24 17:57:05 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 2 12:43:20 2017 +0800 ---------------------------------------------------------------------- .../kafka/internal/FlinkKafkaProducer.java | 46 ++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1c123e40/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java index 56b40d7..9d50379 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java @@ -181,24 +181,31 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> { } } + /** + * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones, + * so that we can resume transaction after a restart. Implementation of this method is based on + * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}. + */ public void resumeTransaction(long producerId, short epoch) { Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch); LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch); Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers"); + synchronized (transactionManager) { + Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers"); - invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); - invoke(sequenceNumbers, "clear"); + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); + invoke(sequenceNumbers, "clear"); - Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); - setValue(producerIdAndEpoch, "producerId", producerId); - setValue(producerIdAndEpoch, "epoch", epoch); + Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); + setValue(producerIdAndEpoch, "producerId", producerId); + setValue(producerIdAndEpoch, "epoch", epoch); - invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); - invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); - setValue(transactionManager, "transactionStarted", true); + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); + setValue(transactionManager, "transactionStarted", true); + } } public String getTransactionalId() { @@ -224,17 +231,30 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> { return node.id(); } + /** + * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new + * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make + * resumeTransaction simpler. Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight" + * partitions. + */ private void flushNewPartitions() { LOG.info("Flushing new partitions"); - Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); + TransactionalRequestResult result = enqueueNewPartitions(); Object sender = getValue(kafkaProducer, "sender"); invoke(sender, "wakeup"); result.await(); } + private TransactionalRequestResult enqueueNewPartitions() { + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); + invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); + TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); + return result; + } + } + private static Enum<?> getEnum(String enumFullName) { String[] x = enumFullName.split("\\.(?=[^\\.]+$)"); if (x.length == 2) {
