[
https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891376#comment-17891376
]
Jake.zhang edited comment on FLINK-36569 at 10/21/24 2:24 AM:
--------------------------------------------------------------
Yes, it create `getRecoveryProducer` each time, `recyclable` object always
null, so `recyclable.ifPresent(Recyclable::close)` not work.
`org.apache.flink.connector.kafka.sink.KafkaCommitter`
{code:java}
producer =
recyclable
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
.orElseGet(() -> getRecoveryProducer(committable));
producer.commitTransaction();
producer.flush();
recyclable.ifPresent(Recyclable::close);{code}
was (Author: ft20082):
Yes, it create `getRecoveryProducer` each time, `recyclable` object always
null, so `recyclable.ifPresent(Recyclable::close)` not work.
{code:java}
producer =
recyclable
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
.orElseGet(() -> getRecoveryProducer(committable));
producer.commitTransaction();
producer.flush();
recyclable.ifPresent(Recyclable::close);{code}
> flink kafka connector do not close kafka produer when it checkpoint success
> ---------------------------------------------------------------------------
>
> Key: FLINK-36569
> URL: https://issues.apache.org/jira/browse/FLINK-36569
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.19.0, 1.20.0
> Environment: flink: 1.20
> flink kafka connector: 3.3.0-1.20
> Reporter: Jake.zhang
> Priority: Major
> Attachments: image-2024-10-18-13-31-39-253.png
>
>
> flink kafka connector do't close FlinkKafkaInternalProducer when flink
> checkpoint success in flink 1.20/1.19 . it will create one
> FlinkKafkaInternalProducer per checkpoint.
>
> FlinkKafkaInternalProducer do not close automatic. so kafka producer network
> thread will more and more
>
> !image-2024-10-18-13-31-39-253.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)