This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d774feaafb6bbfdbcee653230515fe93d5295926
Author: Nico Kruber <[email protected]>
AuthorDate: Tue Jul 30 16:46:54 2019 +0200

    [FLINK-13498][kafka] abort transactions in parallel
    
    This makes FlinkKafkaProducer abort transactions, e.g. during a first 
startup,
    in parallel making use of lingering CPU resources (using at most
    kafkaProducersPoolSize producers at once each, just like during runtime).
    
    Especially during that first startup (and thus also in tests), a lot of
    producers (5*poolSize) are being created at each sink instance to abort
    potentially existing previous transactions (in most cases, they don't 
exist).
---
 .../connectors/kafka/FlinkKafkaProducer011.java    | 23 ++++++++++++++------
 .../connectors/kafka/FlinkKafkaProducer.java       | 25 ++++++++++++++++------
 2 files changed, 35 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 7bc847f..d5718b7 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -912,13 +912,20 @@ public class FlinkKafkaProducer011<IN>
        // ----------------------------------- Utilities 
--------------------------
 
        private void abortTransactions(Set<String> transactionalIds) {
-               for (String transactionalId : transactionalIds) {
+               transactionalIds.parallelStream().forEach(transactionalId -> {
+                       // don't mess with the original configuration or any 
other properties of the
+                       // original object
+                       // -> create an internal kafka producer on our own and 
do not rely on
+                       //    initTransactionalProducer().
+                       final Properties myConfig = new Properties();
+                       myConfig.putAll(producerConfig);
+                       initTransactionalProducerConfig(myConfig, 
transactionalId);
                        try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
-                                       
initTransactionalProducer(transactionalId, false)) {
-                               // it suffice to call initTransactions - this 
will abort any lingering transactions
+                                       new FlinkKafkaProducer<>(myConfig)) {
+                               // it suffices to call initTransactions - this 
will abort any lingering transactions
                                kafkaProducer.initTransactions();
                        }
-               }
+               });
        }
 
        int getTransactionCoordinatorId() {
@@ -952,12 +959,16 @@ public class FlinkKafkaProducer011<IN>
        }
 
        private FlinkKafkaProducer<byte[], byte[]> 
initTransactionalProducer(String transactionalId, boolean registerMetrics) {
-               producerConfig.put("transactional.id", transactionalId);
+               initTransactionalProducerConfig(producerConfig, 
transactionalId);
                return initProducer(registerMetrics);
        }
 
+       private static void initTransactionalProducerConfig(Properties 
producerConfig, String transactionalId) {
+               producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId);
+       }
+
        private FlinkKafkaProducer<byte[], byte[]> 
initNonTransactionalProducer(boolean registerMetrics) {
-               producerConfig.remove("transactional.id");
+               producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
                return initProducer(registerMetrics);
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 605ee3f..7bf1913 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -1090,14 +1090,21 @@ public class FlinkKafkaProducer<IN>
 
        // ----------------------------------- Utilities 
--------------------------
 
-       private void abortTransactions(Set<String> transactionalIds) {
-               for (String transactionalId : transactionalIds) {
+       private void abortTransactions(final Set<String> transactionalIds) {
+               transactionalIds.parallelStream().forEach(transactionalId -> {
+                       // don't mess with the original configuration or any 
other properties of the
+                       // original object
+                       // -> create an internal kafka producer on our own and 
do not rely on
+                       //    initTransactionalProducer().
+                       final Properties myConfig = new Properties();
+                       myConfig.putAll(producerConfig);
+                       initTransactionalProducerConfig(myConfig, 
transactionalId);
                        try (FlinkKafkaInternalProducer<byte[], byte[]> 
kafkaProducer =
-                               initTransactionalProducer(transactionalId, 
false)) {
-                               // it suffice to call initTransactions - this 
will abort any lingering transactions
+                                       new 
FlinkKafkaInternalProducer<>(myConfig)) {
+                               // it suffices to call initTransactions - this 
will abort any lingering transactions
                                kafkaProducer.initTransactions();
                        }
-               }
+               });
        }
 
        int getTransactionCoordinatorId() {
@@ -1131,12 +1138,16 @@ public class FlinkKafkaProducer<IN>
        }
 
        private FlinkKafkaInternalProducer<byte[], byte[]> 
initTransactionalProducer(String transactionalId, boolean registerMetrics) {
-               producerConfig.put("transactional.id", transactionalId);
+               initTransactionalProducerConfig(producerConfig, 
transactionalId);
                return initProducer(registerMetrics);
        }
 
+       private static void initTransactionalProducerConfig(Properties 
producerConfig, String transactionalId) {
+               producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId);
+       }
+
        private FlinkKafkaInternalProducer<byte[], byte[]> 
initNonTransactionalProducer(boolean registerMetrics) {
-               producerConfig.remove("transactional.id");
+               producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
                return initProducer(registerMetrics);
        }
 

Reply via email to