[
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726390#comment-17726390
]
Tzu-Li (Gordon) Tai commented on FLINK-32196:
---------------------------------------------
In terms of the lingering transactions you are observing, a few questions:
# Are you actually observing that there are lingering transactions not being
aborted in Kafka? Or was that a speculation based on not seeing a
{{abortTransaction()}} in the code?
# If there are actually lingering transactions in Kafka after restore, do they
get timeout by Kafka after {{{}transaction.timeout.ms{}}}? Or are they
lingering beyond the timeout threshold?
> KafkaWriter recovery doesn't abort lingering transactions under the EO
> semantic
> -------------------------------------------------------------------------------
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.6.4, 1.15.4
> Reporter: Sharon Xie
> Priority: Major
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and
> eventually runs OOM. The cause for OOM is that there is a kafka producer
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering
> transactions upon recovery
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the
> `TransactionAborter` doesn't abort those transactions
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function.
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch()
> == 0` which is why we are seeing a producer thread leak as the recovery gets
> stuck in this for loop.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)