Jun Qin created FLINK-23410: ------------------------------- Summary: Use a pool of KafkaProducers to commit Kafka Transactions Key: FLINK-23410 URL: https://issues.apache.org/jira/browse/FLINK-23410 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.12.4, 1.13.1 Reporter: Jun Qin
Currently, {{FlinkKafkaProducer}} contains {{kafkaProducersPoolSize}} (it is 5 by default). But {{kafkaProducersPoolSize}} is only used to calculate next transactionalIds. There is actually no KafkaProducer pool in {{FlinkKafkaProducer}}. This means, for every checkpoint, Flink creates a new KakfaProducer (therefore a new thread) and get a new producer id from Kafka before it can initialize/commit a transaction. When the checkpoint is complete and transaction is committed, the thread is shutdown. This is inefficient not only in terms of Flink's CPU usage (to shutdown/recreate threads) but also in terms of the network communication to Kafka (to re-request the producer Id from Kafka). This JIRA is opened to actually implement the KafkaProducer pool. -- This message was sent by Atlassian Jira (v8.3.4#803005)