rkhachatryan commented on code in PR #540: URL: https://github.com/apache/flink-web/pull/540#discussion_r879905279
########## _posts/2022-05-20-changelog-state-backend.md: ########## @@ -0,0 +1,353 @@ +--- +layout: post +title: "Improving speed and stability of checkpointing with generic log-based incremental checkpoints" +date: 2022-05-20T00:00:00.000Z +authors: +- Roman Khachatryan: + name: "Roman Khachatryan" +- Yuan Mei: + name: "Yuan Mei" +excerpt: This post describes the mechanism introduced in Flink 1.15 that continuously uploads state changes to a durable storage while performing materialization in the background + +--- + +# Introduction + +One of the most important characteristics of stream processing systems is end-to-end latency, i.e. the time it takes for the results of processing an input record to reach the outputs. In the case of Flink, end-to-end latency mostly depends on the checkpointing mechanism, because processing results should only become visible after the state of the stream is persisted to non-volatile storage (this is assuming exactly-once mode; in other modes, results can be published immediately). + +Furthermore, сheckpoint duration also defines the reasonable interval with which checkpoints are made. A shorter interval provides the following advantages: + +* Lower latency for transactional sinks: Transactional sinks commit on checkpoints, so faster checkpoints mean more frequent commits. +* More predictable checkpoint intervals: Currently the length of the checkpoint depends on the size of the artifacts that need to be persisted in the checkpoint storage. +* Less work on recovery. The more frequently the checkpoint, the fewer events need to be re-processed after recovery. + +Following are the main factors affecting checkpoint duration in Flink: + +1. Barrier travel time and alignment duration +1. Time to take state snapshot and persist it onto the non-volatile highly-available storage (such as S3) + +Recent improvements such as [Unaligned checkpoints](https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html) and [ Buffer debloating ](https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment) try to address (1), especially in the presence of back-pressure. Previously, [ Incremental checkpoints ](https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html) were introduced to reduce the size of a snapshot, thereby reducing the time required to store it (2). + +However, there are still some cases when this duration is high + +### Every checkpoint is delayed by at least one task with high parallelism + + +<center> +<img src="{{ site.baseurl }}/img/blog/2022-05-20-changelog-state-backend/failing-task.png"/> +<br/> +</center> +<br/> + + +With the existing incremental checkpoint implementation of the RocksDB state backend, every subtask needs to periodically perform some form of compaction. That compaction results in new, relatively big files, which in turn increase the upload time (2). The probability of at least one node performing such compaction and thus slowing down the whole checkpoint grows proportionally to the number of nodes. In large deployments, almost every checkpoint becomes delayed by some node. + +### Unnecessary delay before uploading state snapshot + +<center> +<img src="{{ site.baseurl }}/img/blog/2022-05-20-changelog-state-backend/checkpoint-timing.png"/> +<br/> +</center> +<br/> + + +State backends don't start any snapshotting work until the task receives at least one checkpoint barrier, increasing the effective checkpoint duration. This is suboptimal if the upload time is comparable to the checkpoint interval; instead, a snapshot could be uploaded continuously throughout the interval. + +This work discusses the mechanism introduced in Flink 1.15 to address the above cases by continuously persisting state changes on non-volatile storage while performing materialization in the background. The basic idea is described in the following section, and then important implementation details are highlighted. Subsequent sections discuss benchmarking results, limitations, and future work. + +# High-level Overview + +The core idea is to introduce a state changelog (a log that records state changes); this changelog allows operators to persist state changes in a very fine-grained manner, as described below: + +* Stateful operators write the state changes to the state changelog, in addition to applying them to the state tables in RocksDB or the in-mem Hashtable. +* An operator can acknowledge a checkpoint as soon as the changes in the log have reached the durable checkpoint storage. +* The state tables are persisted periodically as well, independent of the checkpoints. We call this procedure the materialization of the state on the durable checkpoint storage. +* Once the state is materialized on the checkpoint storage, the state changelog can be truncated to the point where the state is materialized. + +This can be illustrated as follows: + +<center> + <div style="overflow-x: auto"> + <div style="width:150%"> + <img style="display:inline; max-width: 33%; max-height: 200px; margin-left: -1%" src="{{ site.baseurl }}/img/blog/2022-05-20-changelog-state-backend/log_checkpoints_1.png"/> + <img style="display:inline; max-width: 33%; max-height: 200px; margin-left: -1%" src="{{ site.baseurl }}/img/blog/2022-05-20-changelog-state-backend/log_checkpoints_2.png"/> + <img style="display:inline; max-width: 33%; max-height: 200px; margin-left: -1%" src="{{ site.baseurl }}/img/blog/2022-05-20-changelog-state-backend/log_checkpoints_3.png"/> + </div> + </div> + + <br/> +</center> +<br/> + + + +This approach mirrors what database systems do, adjusted to distributed checkpoints: + +* Changes (inserts/updates/deletes) are written to the transaction log, and the transaction is considered durable once the log is synced to disk (or other durable storage). +* The changes are also materialized in the tables (so the database system can efficiently query the table). The tables are usually persisted asynchronously. + +Once all relevant parts of the changed tables have been persisted, the transaction log can be truncated. Once all relevant parts of the changed tables have been persisted, the transaction log can be truncated, which is similar to the materialization procedure in our approach. Review Comment: I guess this comment is a duplicate of the one [above](https://github.com/apache/flink-web/pull/540#discussion_r879537601). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
