[ 
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hilmi Al Fatih updated FLINK-34554:
-----------------------------------
    Issue Type: Improvement  (was: New Feature)

> Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created 
> transactionalId per checkpoint
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34554
>                 URL: https://issues.apache.org/jira/browse/FLINK-34554
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.3, 1.17.2, 1.18.1
>            Reporter: Hilmi Al Fatih
>            Priority: Blocker
>             Fix For: 1.16.3, 1.17.2, 1.18.1
>
>         Attachments: image (4).png, image (5).png
>
>
> Flink version: 1.17.1
> Kafka Broker version: 2.7.1 * 4 GB heap memory for each
> Hi,
> We recently had an outage in our production system after we perform a Flink 
> kafka-connector API upgrade. To give a brief context, our application is a 
> simple kafka-to-kafka pipeline with minimal processing. We run in 
> EXACTLY_ONCE mode, thus kafka transaction is involved.
> Our application runs with total around 350 sink subtask. Checkpoint period 
> was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. 
> We recently performed an upgrade with the following details: * Previous state:
>  * Flink version: 1.14.4
>  * Broker version: 2.7.1
>  * kafka connector API: FlinkKafkaProducer
>  * Update to:
>  * Flink version: 1.17.1
>  * Broker version: 2.7.1
>  * kafka connector API: KafkaSink
> Around 10 hours after the deployment, our kafka broker started to failing 
> with OOM error. Heap dump entries are dominated by the ProducerStateEntry 
> records.
> Our investigation leads to finding the total implementation change between 
> FlinkKafkaProducer and KafkaSink. * KafkaSink generate different 
> transactionalId for each checkpoint,
>  * FlinkKafkaProducer uses constant set of transactionalId pool.
> With this behavior, KafkaSink seemed to exhaust our broker heap very fast and 
> the ProducerStateEntry will only expire after 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , 
> which by default is set to 7 days.  
> ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
>  
> [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
>  
> [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])For
>  our job, it means it creates roughly:
> 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000
> 7 days) ~ 42mil entries.Here is attached the number of ProducerStateEntry 
> entries of heap dump when it is OOM:
> 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.There are 
> several things that come up in our mind to mitigate the drawbacks such as: * 
> reduce the number of subtasks, so it reduces the number of transactionalId
>  * Enlarge the checkpoint period to reduce the newly generated 
> transactionalId rate.
>  * Shorten 
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to 
> expire the unused transactionalId soon.
>  * Increase the broker heap
> However, above mitigation might be too cumbersome and need careful tuning 
> which harm our flexibility.In addition, due to the lack of maintaining 
> lingering transaction state, TransactionAborter seems to abort old 
> transaction naively. We might be accidentally (or purposefully) reuse the 
> same transactionalIdPrefix and start the counter from 0. In that case, if the 
> old transactionalId happens to have epoch >0, it will keep looping aborting 
> the nonexistent transactions up to the latest checkpoint counter (which may 
> be too big) and make the job stuck.Btw, I am aware that in Flink 2.0, you 
> guys are putting a lot of effort on creating better integration with Kafka 
> transaction 
> ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
>  In FLIP-319, it mentions something about TID pooling. However, it is seem 
> that there is no relevant page yet for it, so I wonder whether there are any 
> concrete plan already that I can follow, or if there is something I can 
> contribute to, I will be really happy to help.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to