rkhachatryan commented on code in PR #540:
URL: https://github.com/apache/flink-web/pull/540#discussion_r879905279


##########
_posts/2022-05-20-changelog-state-backend.md:
##########
@@ -0,0 +1,353 @@
+---
+layout: post 
+title:  "Improving speed and stability of checkpointing with generic log-based 
incremental checkpoints"
+date: 2022-05-20T00:00:00.000Z 
+authors:
+- Roman Khachatryan:
+  name: "Roman Khachatryan"
+- Yuan Mei:
+  name: "Yuan Mei"
+excerpt: This post describes the mechanism introduced in Flink 1.15 that 
continuously uploads state changes to a durable storage while performing 
materialization in the background
+
+---
+
+# Introduction
+
+One of the most important characteristics of stream processing systems is 
end-to-end latency, i.e. the time it takes for the results of processing an 
input record to reach the outputs. In the case of Flink, end-to-end latency 
mostly depends on the checkpointing mechanism, because processing results 
should only become visible after the state of the stream is persisted to 
non-volatile storage (this is assuming exactly-once mode; in other modes, 
results can be published immediately).
+
+Furthermore, сheckpoint duration also defines the reasonable interval with 
which checkpoints are made. A shorter interval provides the following 
advantages:
+
+* Lower latency for transactional sinks: Transactional sinks commit on 
checkpoints, so faster checkpoints mean more frequent commits.
+* More predictable checkpoint intervals: Currently the length of the 
checkpoint depends on the size of the artifacts that need to be persisted in 
the checkpoint storage.
+* Less work on recovery. The more frequently the checkpoint, the fewer events 
need to be re-processed after recovery.
+
+Following are the main factors affecting checkpoint duration in Flink:
+
+1. Barrier travel time and alignment duration
+1. Time to take state snapshot and persist it onto the non-volatile 
highly-available storage (such as S3)
+
+Recent improvements such as [Unaligned 
checkpoints](https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html)
 and [ Buffer debloating 
](https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment)
 try to address (1), especially in the presence of back-pressure. Previously, [ 
Incremental checkpoints 
](https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html) 
were introduced to reduce the size of a snapshot, thereby reducing the time 
required to store it (2).
+
+However, there are still some cases when this duration is high
+
+### Every checkpoint is delayed by at least one task with high parallelism
+
+
+<center>
+<img src="{{ site.baseurl 
}}/img/blog/2022-05-20-changelog-state-backend/failing-task.png"/>
+<br/>
+</center>
+<br/>
+
+
+With the existing incremental checkpoint implementation of the RocksDB state 
backend, every subtask needs to periodically perform some form of compaction. 
That compaction results in new, relatively big files, which in turn increase 
the upload time (2). The probability of at least one node performing such 
compaction and thus slowing down the whole checkpoint grows proportionally to 
the number of nodes. In large deployments, almost every checkpoint becomes 
delayed by some node.
+
+### Unnecessary delay before uploading state snapshot
+
+<center>
+<img src="{{ site.baseurl 
}}/img/blog/2022-05-20-changelog-state-backend/checkpoint-timing.png"/>
+<br/>
+</center>
+<br/>
+
+
+State backends don't start any snapshotting work until the task receives at 
least one checkpoint barrier, increasing the effective checkpoint duration. 
This is suboptimal if the upload time is comparable to the checkpoint interval; 
instead, a snapshot could be uploaded continuously throughout the interval.
+
+This work discusses the mechanism introduced in Flink 1.15 to address the 
above cases by continuously persisting state changes on non-volatile storage 
while performing materialization in the background. The basic idea is described 
in the following section, and then important implementation details are 
highlighted. Subsequent sections discuss benchmarking results, limitations, and 
future work.
+
+# High-level Overview
+
+The core idea is to introduce a state changelog (a log that records state 
changes); this changelog allows operators to persist state changes in a very 
fine-grained manner, as described below:
+
+* Stateful operators write the state changes to the state changelog, in 
addition to applying them to the state tables in RocksDB or the in-mem 
Hashtable.
+* An operator can acknowledge a checkpoint as soon as the changes in the log 
have reached the durable checkpoint storage.
+* The state tables are persisted periodically as well, independent of the 
checkpoints. We call this procedure the materialization of the state on the 
durable checkpoint storage.
+* Once the state is materialized on the checkpoint storage, the state 
changelog can be truncated to the point where the state is materialized.
+
+This can be illustrated as follows:
+
+<center>
+    <div style="overflow-x: auto">
+        <div style="width:150%">
+            <img style="display:inline; max-width: 33%; max-height: 200px; 
margin-left: -1%" src="{{ site.baseurl 
}}/img/blog/2022-05-20-changelog-state-backend/log_checkpoints_1.png"/> 
+            <img style="display:inline; max-width: 33%; max-height: 200px; 
margin-left: -1%" src="{{ site.baseurl 
}}/img/blog/2022-05-20-changelog-state-backend/log_checkpoints_2.png"/> 
+            <img style="display:inline; max-width: 33%; max-height: 200px; 
margin-left: -1%" src="{{ site.baseurl 
}}/img/blog/2022-05-20-changelog-state-backend/log_checkpoints_3.png"/> 
+        </div>
+    </div>
+
+    <br/>
+</center>
+<br/>
+
+
+
+This approach mirrors what database systems do, adjusted to distributed 
checkpoints:
+
+* Changes (inserts/updates/deletes) are written to the transaction log, and 
the transaction is considered durable once the log is synced to disk (or other 
durable storage).
+* The changes are also materialized in the tables (so the database system can 
efficiently query the table). The tables are usually persisted asynchronously.
+
+Once all relevant parts of the changed tables have been persisted, the 
transaction log can be truncated. Once all relevant parts of the changed tables 
have been persisted, the transaction log can be truncated, which is similar to 
the materialization procedure in our approach.

Review Comment:
   I guess this comment is a duplicate of the one 
[above](https://github.com/apache/flink-web/pull/540#discussion_r879537601).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to