[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726479#comment-17726479
 ] 

Sharon Xie commented on FLINK-32196:
------------------------------------

[~tzulitai]Thanks for analyzing the logs. As additional context, this happened 
after a security patch on the broker side. Though most of the jobs auto 
recovered, we've found a couple that got stuck in the recovery step. So there 
is a chance that this is caused by an issue from the broker side - eg: some 
broker side transaction state is lost or bad partition state. Any possible 
explanation here?

A couple other questions.

> These are the ones to abort in step 2. Initializing the transaction 
> automatically aborts the transaction, as I mentioned in earlier comments. So 
> I believe this is also expected.

In this case, does the transaction state 
[messages|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] in 
kafka broker look right to you? It seems there is no change in those messages 
except the epoch and txnLastUpdateTimestamp. I guess the idea is to call 
transaction init with the old txnId and just let it time out. But there is some 
heart beat to update the transaction? Also can you please explain a bit about 
why abortTransaction is not used?

> What is NOT expected, though, is the bunch of kafka-producer-network-thread 
> threads being spawned per TID to abort in step 2.
Is it common to have so many lingering transactions that need to abort? The job 
is not a high throughput one. About 3 records/sec at the checkpointing interval 
= 10sec. It takes ~30min to run oom and I feel it's weird that the kafka sink 
would need so long to recover.

> kafka sink under EO sometimes is unable to recover from a checkpoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-32196
>                 URL: https://issues.apache.org/jira/browse/FLINK-32196
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4, 1.15.4
>            Reporter: Sharon Xie
>            Priority: Major
>         Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to