Jun Qin created FLINK-23410:
-------------------------------
Summary: Use a pool of KafkaProducers to commit Kafka Transactions
Key: FLINK-23410
URL: https://issues.apache.org/jira/browse/FLINK-23410
Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.12.4, 1.13.1
Reporter: Jun Qin
Currently, {{FlinkKafkaProducer}} contains {{kafkaProducersPoolSize}} (it is 5
by default). But {{kafkaProducersPoolSize}} is only used to calculate next
transactionalIds. There is actually no KafkaProducer pool in
{{FlinkKafkaProducer}}. This means, for every checkpoint, Flink creates a new
KakfaProducer (therefore a new thread) and get a new producer id from Kafka
before it can initialize/commit a transaction. When the checkpoint is complete
and transaction is committed, the thread is shutdown. This is inefficient not
only in terms of Flink's CPU usage (to shutdown/recreate threads) but also in
terms of the network communication to Kafka (to re-request the producer Id from
Kafka). This JIRA is opened to actually implement the KafkaProducer pool.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)