pnowojski commented on a change in pull request #9287: [FLINK-13498][kafka]
abort transactions in parallel
URL: https://github.com/apache/flink/pull/9287#discussion_r309070252
##########
File path:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
##########
@@ -906,11 +907,33 @@ private void
resetAvailableTransactionalIdsPool(Collection<String> transactional
// ----------------------------------- Utilities
--------------------------
private void abortTransactions(Set<String> transactionalIds) {
- for (String transactionalId : transactionalIds) {
- try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
-
initTransactionalProducer(transactionalId, false)) {
- // it suffice to call initTransactions - this
will abort any lingering transactions
- kafkaProducer.initTransactions();
+ // shortcut for non-exactly-once producers
+ if (transactionalIds.isEmpty()) {
+ return;
+ }
+
+ ForkJoinPool forkJoinPool = null;
+ try {
+ // limit the number of connections to the number that
is used during runtime
+ forkJoinPool = new ForkJoinPool(kafkaProducersPoolSize);
+ forkJoinPool.submit(() ->
+
transactionalIds.parallelStream().forEach(transactionalId -> {
+ // don't mess with the original
configuration or any other properties of the
+ // original object
+ // -> create an internal kafka producer
on our own and do not rely on
+ // initTransactionalProducer().
+ final Properties myConfig = new
Properties();
+ myConfig.putAll(producerConfig);
+ myConfig.put("transactional.id",
transactionalId);
Review comment:
can you deduplicate the config setting code with
`initTransactionalProducer`? Yes, now it's only one `put("transactional.id",
transactionalId);` call, but this can change at some point of time and such
code duplication can make this fragile.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services