pnowojski commented on a change in pull request #9287: [FLINK-13498][kafka]
abort transactions in parallel
URL: https://github.com/apache/flink/pull/9287#discussion_r309069745
##########
File path:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
##########
@@ -906,11 +907,33 @@ private void
resetAvailableTransactionalIdsPool(Collection<String> transactional
// ----------------------------------- Utilities
--------------------------
private void abortTransactions(Set<String> transactionalIds) {
- for (String transactionalId : transactionalIds) {
- try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
-
initTransactionalProducer(transactionalId, false)) {
- // it suffice to call initTransactions - this
will abort any lingering transactions
- kafkaProducer.initTransactions();
+ // shortcut for non-exactly-once producers
+ if (transactionalIds.isEmpty()) {
+ return;
+ }
+
+ ForkJoinPool forkJoinPool = null;
+ try {
+ // limit the number of connections to the number that
is used during runtime
+ forkJoinPool = new ForkJoinPool(kafkaProducersPoolSize);
Review comment:
Hmm, when we were discussing offline I understood that this will be global
pool with static size. As it is now, we can be creating some substantial number
(hundreds) of threads (albeit short lived). I'm not sure about this...
Crazy idea... what about making this pool `static` and with killing/shutting
down idling threads?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services