[ https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726385#comment-17726385 ]
Tzu-Li (Gordon) Tai commented on FLINK-32196: --------------------------------------------- Hi [~sharonxr55], the {{abortTransactionOfSubtask}} you posted aborts transactions by relying on the fact that when you call `initTransactions()`, Kafka automatically aborts any old ongoing transactions under the same {{{}transactional.id{}}}. Could you re-elaborate the producer leak? As far as I can tell, the loop is reusing the same producer instance; on every loop entry, the same producer instance is reset with a new {{transactional.id}} and called {{initTransactions()}} to abort the transaction. > KafkaWriter recovery doesn't abort lingering transactions under the EO > semantic > ------------------------------------------------------------------------------- > > Key: FLINK-32196 > URL: https://issues.apache.org/jira/browse/FLINK-32196 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.6.4, 1.15.4 > Reporter: Sharon Xie > Priority: Major > > We are seeing an issue where a Flink job using kafka sink under EO is unable > to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and > eventually runs OOM. The cause for OOM is that there is a kafka producer > thread leak. > Here is our best hypothesis for the issue. > In `KafkaWriter` under the EO semantic, it intends to abort lingering > transactions upon recovery > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179] > However, the actual implementation to abort those transactions in the > `TransactionAborter` doesn't abort those transactions > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124] > Specifically `producer.abortTransaction()` is never called in that function. > Instead it calls `producer.flush()`. > Also The function is in for loop that only breaks when `producer.getEpoch() > == 0` which is why we are seeing a producer thread leak as the recovery gets > stuck in this for loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)