[
https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735
]
Jake.zhang edited comment on FLINK-36569 at 10/22/24 6:34 AM:
--------------------------------------------------------------
add schedule close service in
`org.apache.flink.connector.kafka.sink.KafkaWriter` constructor, reserve
recently 5 checkpoint producer. it works.
need to wait for the Kafka transaction timeout.
{code:java}
initFlinkMetrics();
// 定时任务检查是否有需要关闭的 checkpoint id
this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
this.autoCloseService.scheduleWithFixedDelay(
() -> {
try {
LOG.info("last checkpoint id: {}", lastCheckpointId);
// 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1
作为最大的 transaction
FlinkKafkaInternalProducer flinkKafkaInternalProducer =
null;
while ((flinkKafkaInternalProducer =
(FlinkKafkaInternalProducer)
this.producerCloseables.peek())
!= null) {
String transactionId =
flinkKafkaInternalProducer.getTransactionalId();
assert transactionId != null;
String[] transactionIdArr = transactionId.split("-");
long itemCheckpointId =
Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
if (lastCheckpointId - 5 > itemCheckpointId) {
// 消费出来置空
try {
FlinkKafkaInternalProducer closeable =
(FlinkKafkaInternalProducer)
this.producerCloseables.poll();
closeable.close();
LOG.info(
"close producer transaction id: {}",
closeable.getTransactionalId());
} catch (Exception e) {
LOG.warn("fkip close error", e);
}
} else {
// 等待下次检查
break;
}
}
} catch (Exception e) {
LOG.warn("schedule auto close producer error", e);
}
},
60,
60,
TimeUnit.SECONDS);
} {code}
was (Author: ft20082):
add schedule close service in
`org.apache.flink.connector.kafka.sink.KafkaWriter`, reserve recently 5
checkpoint producer. it works.
need to wait for the Kafka transaction timeout.
{code:java}
initFlinkMetrics();
// 定时任务检查是否有需要关闭的 checkpoint id
this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
this.autoCloseService.scheduleWithFixedDelay(
() -> {
try {
LOG.info("last checkpoint id: {}", lastCheckpointId);
// 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1
作为最大的 transaction
FlinkKafkaInternalProducer flinkKafkaInternalProducer =
null;
while ((flinkKafkaInternalProducer =
(FlinkKafkaInternalProducer)
this.producerCloseables.peek())
!= null) {
String transactionId =
flinkKafkaInternalProducer.getTransactionalId();
assert transactionId != null;
String[] transactionIdArr = transactionId.split("-");
long itemCheckpointId =
Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
if (lastCheckpointId - 5 > itemCheckpointId) {
// 消费出来置空
try {
FlinkKafkaInternalProducer closeable =
(FlinkKafkaInternalProducer)
this.producerCloseables.poll();
closeable.close();
LOG.info(
"close producer transaction id: {}",
closeable.getTransactionalId());
} catch (Exception e) {
LOG.warn("fkip close error", e);
}
} else {
// 等待下次检查
break;
}
}
} catch (Exception e) {
LOG.warn("schedule auto close producer error", e);
}
},
60,
60,
TimeUnit.SECONDS);
} {code}
> flink kafka connector do not close kafka produer when it checkpoint success
> ---------------------------------------------------------------------------
>
> Key: FLINK-36569
> URL: https://issues.apache.org/jira/browse/FLINK-36569
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.19.0, 1.20.0
> Environment: flink: 1.20
> flink kafka connector: 3.3.0-1.20
> Reporter: Jake.zhang
> Priority: Major
> Attachments: image-2024-10-18-13-31-39-253.png,
> image-2024-10-21-14-02-41-823.png
>
>
> flink kafka connector do't close FlinkKafkaInternalProducer when flink
> checkpoint success in flink 1.20/1.19 . it will create one
> FlinkKafkaInternalProducer per checkpoint.
>
> FlinkKafkaInternalProducer do not close automatic. so kafka producer network
> thread will more and more
> it create `getRecoveryProducer` each time, `recyclable` object always
> null, so `recyclable.ifPresent(Recyclable::close)` not work.
> `org.apache.flink.connector.kafka.sink.KafkaCommitter`
> {code:java}
> producer =
> recyclable
> .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
> .orElseGet(() -> getRecoveryProducer(committable));
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);{code}
>
> !image-2024-10-21-14-02-41-823.png!
>
> !image-2024-10-18-13-31-39-253.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)