[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c123e40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c123e40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c123e40

Branch: refs/heads/master
Commit: 1c123e40e564b097fde22da648679963c40bdfe3
Parents: dc2ef4f
Author: Piotr Nowojski <[email protected]>
Authored: Tue Oct 24 17:57:05 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 2 12:43:20 2017 +0800

----------------------------------------------------------------------
 .../kafka/internal/FlinkKafkaProducer.java      | 46 ++++++++++++++------
 1 file changed, 33 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c123e40/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 56b40d7..9d50379 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -181,24 +181,31 @@ public class FlinkKafkaProducer<K, V> implements 
Producer<K, V> {
                }
        }
 
+       /**
+        * Instead of obtaining producerId and epoch from the transaction 
coordinator, re-use previously obtained ones,
+        * so that we can resume transaction after a restart. Implementation of 
this method is based on
+        * {@link 
org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
+        */
        public void resumeTransaction(long producerId, short epoch) {
                Preconditions.checkState(producerId >= 0 && epoch >= 0, 
"Incorrect values for producerId {} and epoch {}", producerId, epoch);
                LOG.info("Attempting to resume transaction with producerId {} 
and epoch {}", producerId, epoch);
 
                Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-               Object sequenceNumbers = getValue(transactionManager, 
"sequenceNumbers");
+               synchronized (transactionManager) {
+                       Object sequenceNumbers = getValue(transactionManager, 
"sequenceNumbers");
 
-               invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-               invoke(sequenceNumbers, "clear");
+                       invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+                       invoke(sequenceNumbers, "clear");
 
-               Object producerIdAndEpoch = getValue(transactionManager, 
"producerIdAndEpoch");
-               setValue(producerIdAndEpoch, "producerId", producerId);
-               setValue(producerIdAndEpoch, "epoch", epoch);
+                       Object producerIdAndEpoch = 
getValue(transactionManager, "producerIdAndEpoch");
+                       setValue(producerIdAndEpoch, "producerId", producerId);
+                       setValue(producerIdAndEpoch, "epoch", epoch);
 
-               invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+                       invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
 
-               invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-               setValue(transactionManager, "transactionStarted", true);
+                       invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+                       setValue(transactionManager, "transactionStarted", 
true);
+               }
        }
 
        public String getTransactionalId() {
@@ -224,17 +231,30 @@ public class FlinkKafkaProducer<K, V> implements 
Producer<K, V> {
                return node.id();
        }
 
+       /**
+        * Besides committing {@link 
org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also 
adding new
+        * partitions to the transaction. flushNewPartitions method is moving 
this logic to pre-commit/flush, to make
+        * resumeTransaction simpler. Otherwise resumeTransaction would require 
to restore state of the not yet added/"in-flight"
+        * partitions.
+        */
        private void flushNewPartitions() {
                LOG.info("Flushing new partitions");
-               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-               Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-               invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-               TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+               TransactionalRequestResult result = enqueueNewPartitions();
                Object sender = getValue(kafkaProducer, "sender");
                invoke(sender, "wakeup");
                result.await();
        }
 
+       private TransactionalRequestResult enqueueNewPartitions() {
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               synchronized (transactionManager) {
+                       Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+                       invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
+                       TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+                       return result;
+               }
+       }
+
        private static Enum<?> getEnum(String enumFullName) {
                String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
                if (x.length == 2) {

Reply via email to