[ 
https://issues.apache.org/jira/browse/FLINK-37375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937879#comment-17937879
 ] 

Jufang He commented on FLINK-37375:
-----------------------------------

[~zakelly] Yes, in this design, the async operations must finish before the 
checkpoint marked complete.

Perhaps I can provide a more detailed explanation of the specific issue we 
encountered to better understand this design. In our scenario of using Flink to 
write data to Paimon, we faced the problem of slow nodes in DFS. The submission 
of Paimon data is tightly integrated with Flink's checkpointing mechanism. The 
writer operator flushes data to DFS during `prepareSnapshotPreBarrier`, and 
after the data flush is completed, it generates metadata for the data files. 
Before the barrier is sent downstream, this metadata is forwarded to the 
downstream commit operator to ensure that the commit operator can obtain all 
the file metadata required for this checkpoint when doing snapshot (i.e., when 
all barriers are received). During the `notifyCheckpointComplete` phase, this 
metadata is finally committed.

We know that `prepareSnapshotPreBarrier` occurs during the synchronous phase of 
the checkpoint. If a slow DFS node is encountered, it can take a long time, 
affecting throughput. Through the optimization in this design, data can be 
flushed to local storage during `prepareSnapshotPreBarrier`, and the upload to 
DFS can be handled asynchronously during the checkpoint's async phase. Flushing 
to local storage is generally more stable in terms of performance, and 
uploading data to DFS is performed asynchronously, which significantly reduces 
the blocking time during the synchronous phase.

Therefore, in this design, these time-consuming operations are strongly tied to 
the flink checkpoint. I believe similar issues might arise in other scenarios 
(such as writing to other types of data lakes or implementing custom 
time-consuming snapshot logic), so I propose providing an asyncOperate 
interface in the checkpoint to universally address such problems.

> Checkpoint supports the Operator to customize asynchronous snapshot state
> -------------------------------------------------------------------------
>
>                 Key: FLINK-37375
>                 URL: https://issues.apache.org/jira/browse/FLINK-37375
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.1
>            Reporter: Jufang He
>            Priority: Major
>              Labels: pull-request-available
>
> In some Flink task operators, slow operations such as file uploads or data 
> flushing may be performed during the synchronous phase of Checkpoint. Due to 
> performance issues with external storage components, the synchronous phase 
> may take too long to execute, significantly impacting the job's throughput. 
> For example, during our internal use of Paimon, we observed that uploading 
> files to HDFS during the Checkpoint synchronous phase could encounter random 
> HDFS slow node issues, leading to a substantial negative impact on task 
> throughput.
> To address this issue, I propose supporting a generic operator custom 
> asynchronous snapshot feature, allowing users to move time-consuming logic to 
> the asynchronous phase of Checkpoint, thereby minimizing the blocking of the 
> main thread and improving task throughput. For instance, the Paimon writer 
> operator could write data locally during the Checkpoint synchronous phase and 
> upload files to remote storage during the asynchronous phase. Beyond the 
> Paimon data upload scenario, other operator logic may also experience slow 
> execution during the synchronous phase. This approach aims to uniformly 
> optimize such issues.
> I drafted a flip for this issue: 
> [https://docs.google.com/document/d/1lwxLEQjD6jVhZUBMRGhzQNWKSvdbPbYNQsV265gR4kw]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to