Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/4543#discussion_r139690107
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
```sh
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full,
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can
recover in case of a software or machine
+failure (see [here]({{ site.baseurl
}}/internals/stream_checkpointing.html).Â
+
+Flink creates checkpoints periodically to track the progress of a job so
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal
load from pipelineâs data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+<p class="text-center">
+ <img alt="Figure 1: Full Checkpoints vs Incremental Checkpoints"
width="80%" src="{{ site.baseurl }}/fig/incremental_cp_basic.svg"/>
+</p>
+
+With incremental checkpointing, each checkpoint contains only the state
change since the previous checkpoint.
+
+* For the first checkpoint ``CP 1``, there is no difference between a full
checkpoint and the complete state at the time the
+checkpoint is written.
+
+* For ``CP 2``, incremental checkpointing will write only the changes
since ``CP 1``: the value for ``K1`` has changed and a mapping
+for ``K3`` was added.
+
+* For checkpoint ``CP 3``, incremental checkpointing only records the
changes since ``CP 2``.
+
+Notice that, unlike in full checkpoints, we also must record changes that
delete state in an incremental checkpoint, as
+in the case of ``K0``. In this simple example, we can see how incremental
checkpointing can reduce the amount of data that
+is written for each checkpoint.
+
+The next interesting question: how does restoring from incremental
checkpoints compare to restoring from full
+checkpoints? Restoring a full checkpoint is as simple as loading all the
data from the checkpoint back into the job
+state because full checkpoints are self-contained. In contrast, to restore
an incremental checkpoint, we need to replay
+the history of all incremental checkpoints that are in the reference chain
of the checkpoint we are trying to restore.
+In our example from Figure 1, those connections are represented by the
orange arrows. If we want to restore ``CP 3``, as a
+first step, we need to apply all the changes of ``CP 1`` to the empty
initial job state. On top of that, we apply the
+changes from ``CP 2``, and then the changes from ``CP 3``.
+
+A different way to think about basic incremental checkpoints is to imagine
it as a changelog with some aggregation. What
+we mean by aggregated is that for example, if the state under key ``K1``
is overwritten multiple times between two
+consecutive checkpoints, we will only record the latest state value at the
time in the checkpoint. All previous changes
+are thereby subsumed.
+
+This leads us to the discussion of the potential *disadvantages* of
incremental checkpoints compared to full checkpoints.
+While we save work in writing checkpoints, we have to do more work in
reading the data from multiple checkpoints on
+recovery. Furthermore, we can no longer simply delete old checkpoints
because new checkpoints rely upon them and the
+history of old checkpoints can grow indefinitely over time (like a
changelog).
+
+At this point, it looks like we didnât gain too much from incremental
checkpoints because we are again trading between
+checkpointing overhead and recovery time. Fortunately, there are ways to
improve on this naive approach to recovery. One
+simple and obvious way to restrict recovery time and the length of the
checkpoint history is to write a full checkpoint
+from time to time. We can drop all checkpoints prior to the most recent
full checkpoint, and the full checkpoint can
+serve as a new basis for future incremental checkpoints.
+
+Our actual implementation of incremental checkpoints in Flink is more
involved and designed to address a number of
+different tradeoffs. Our incremental checkpointing restricts the size of
the checkpoint history and therefore never
+needs take a full checkpoint to keep recovery efficiently! We present more
detail about this in the next section, but
+the high level idea is to accept a small amount of redundant state writing
to incrementally introduce
+merged/consolidated replacements for previous checkpoints. For now, you
can think about Flinkâs approach as stretching
+out and distributing the consolidation work over several incremental
checkpoints, instead of doing it all at once in a
+full checkpoint. Every incremental checkpoint can contribute a share for
consolidation. We also track when old
+checkpoints data becomes obsolete and then prune the checkpoint history
over time.
+
+### Incremental Checkpoints in Flink
+
+In the previous section, we discussed that incremental checkpointing is
mainly about recording all effective state
+modifications between checkpoints. This poses certain requirements on the
underlying data structures in the state
+backend that manages the jobâs state. It goes without saying that the
data structure should always provide a decent
+read-write performance to keep state access swift. At the same time, for
incremental checkpointing, the state backend
+must be able to efficiently detect and iterate state modifications since
the previous checkpoint.
+
+One data structure that is very well-suited for this use case is the
*log-structured-merge (LSM) tree* that is the core
+data structure in Flinkâs RocksDB-based state backend. Without going
into too much detail, the write path of RocksDB
+already roughly resembles a changelog with some pre-aggregation â which
perfectly fits the needs of incremental
+checkpoints. On top of that, RocksDB also has a *compaction mechanism* can
be regarded as an elaborated form of log
+compaction.
+
+#### RocksDB Snapshots as a Foundation
+
+In a nutshell, *RocksDB is a key-value store based on LSM trees*. The
write path of RocksDB first collects all changes as
+key-value pairs in a mutable, in-memory buffer called memtable. Writes to
the same key in a memtable can simply replace
+previous values (this is the pre-aggregation aspect). Once the memtable is
full, it is written to disk completely with
+all entries sorted by their key. Typically, RocksDB also applies a
lightweight compression (e.g. snappy) in the write
+process. After the memtable was written to disk, it becomes immutable and
is now called a *sorted-string-table
+(sstable)*. Figure 2 illustrates these basic RocksDB internals.
+
+<p class="text-center">
+ <img alt="Figure 2: RocksDB architecture (simplified)" width="75%"
src="{{ site.baseurl }}/fig/rocksdb_architecture_simple.png"/>
+</p>
+
+To avoid the problem of collecting an infinite number of sstables over
time, a background task called compaction is
+constantly merging sstables to consolidate potential duplicate entries for
each key from the merged tables. Once some
+sstables have been merged, those original sstables become obsolete and are
deleted by RocksDB. The newly created merged
+sstable contains all their net information. We show an example for such a
merge in Figure 3. SSTable-1 and SStable-2
+contain some duplicate mappings for certain keys, such as key ``9``. The
system can apply a sort-merge strategy in which
+the newer mappings from ``SSTable-2`` overwrite mappings for keys that
also existed in ``SSTable-1``. For key ``7``, we can also
+see a delete (or antimatter) entry that, when merged, results in omitting
key ``7`` in the merge result. Notice that the
+merge in RocksDB is typically generalised to a multi-way merge. We wonât
go into details about the read path here,
+because it is irrelevant for the approach that we want to present. You can
find more details about RocksDB internals in
+their [documentation](http://rocksdb.org/).
+
+<p class="text-center">
+ <img alt="Figure 3: Merging SSTable files" width="50%" src="{{
site.baseurl }}/fig/sstable_merge.png"/>
+</p>
+
+#### Integrating RocksDBâs Snapshots with Flinkâs Checkpoints
+
+Flinkâs incremental checkpointing logic operates on top of this
mechanism that RocksDB provides. From a high-level
+perspective, when taking a checkpoint, we track which sstable files have
been created and deleted by RocksDB since the
+previous checkpoint. This is sufficient for figuring out the effective
state changes because sstables are immutable. Our
+backend remembers the sstables that already existed in the last completed
checkpoint in order to figure out which files
+have been created or deleted in the current checkpoint interval. With this
in mind, we will now explain the details of
+checkpointing state in our RocksDB backend.
+
+In the first step, Flink triggers a flush in RocksDB so that all all
memtables are forced into sstables on disk, and all
+sstables are hard-linked in a local temporary directory. This step of the
checkpoint is synchronous to the processing
+pipeline, and all further steps are performed asynchronously and will not
block processing.
+
+Then, all new sstables (w.r.t. the previous checkpoint) are copied to
stable storage (e.g. HDFS) and referenced in the
+new checkpoint. All sstables that already existed in the previous
checkpoint will *not be copied again to stable
+storage* but simply re-referenced. Deleted files will simply no longer
receive a reference in the new checkpoint. Notice
+that deleted sstables in RocksDB are always the result of compaction. This
is the way in which Flinkâs incremental
+checkpoints can prune the checkpoint history. Old sstables are eventually
replaced by the sstable that is the result of
+merging them. Note that in a strict sense of tracking changes between
checkpoints, this uploading of consolidated tables
+is redundant work. But it is performed incrementally, typically adding
only a small amount of overhead to some
+checkpoints. However, we absolutely consider that overhead to be a
worthwhile investment because it allows us to keep a
+shorter history of checkpoints to consider in a recovery.
+
+Another interesting point is how Flink can determine when it is safe to
delete a shared file. Our solution works as
+follows: for each file, we keep a reference count for each sstable file
that we copied to stable storage. These counts
+are maintained by the checkpoint coordinator on the job master in a
*shared state registry*. This shared registry tracks
+the number of checkpoints that reference a shared file in stable storage,
e.g. an uploaded sstable. When a checkpoint is
+completed, the checkpoint coordinator simply increases the counts for all
files that are referenced in the new
+checkpoint by 1. If a checkpoint is dropped, the count of all files it has
referenced is decreased by 1. When the count
+goes down to 0, the shared file is deleted from stable storage because it
is no longer used by any checkpoint.
+
+<p class="text-center">
+ <img alt="Figure 4: Flink incremental checkpointing example"
width="100%" src="{{ site.baseurl }}/fig/incremental_cp_impl_example.svg"/>
+</p>
+
+To make this idea a bit more complete, see Figure 4, where we show an
example run over 4 incremental checkpoints to make
+things a bit more concrete. We illustrate what is happening for one
subtask (here: subtask index 1) of one operator
+(called ``Operator-2``) with keyed state. Furthermore, for this example we
assume that the number of retained
+checkpoints is configured to 2, so that Flink will always keep the two
latest checkpoints and older checkpoints are
+pruned. The columns show, for each checkpoint, the state of the local
RocksDB instance (i.e. the current sstable files),
+the files that are referenced in the checkpoint, and the counts in the
shared state registry after the checkpoint is
+completed. For checkpoint 1 (``CP 1``)), we can see that the local RocksDB
directory contains two sstable files, which
+are considered as new and uploaded to stable storage. We upload the files
under the checkpoint directory of the
+corresponding checkpoint that first uploaded them, in this case ``cp-1``,
and use unique filenames because they could
+otherwise collide with identical sstable names from other subtasks. When
the checkpoint completes, the two entries are
+created in the shared state registry, one for each newly uploaded file,
and their counts are set to 1. Notice that the
+key in the shared state registry is a composite of operator, subtask, and
the original sstable file name. In the actual
+implementation, the shared state registry also keeps a mapping from the
key to the file path in stable storage besides
+the count, which is not shown to keep the graphic clearer.
+
+At the time of the second checkpoint, two new sstable files have been
created by RocksDB and the two older sstable files
+from the previous checkpoint also still exist. For checkpoint ``CP 2``,
Flink must now upload the two new files to
+stable storage and can reference the ``sstable-(1)`` and ``sstable-(2)``
from the previous checkpoint. We can see that
+the file references to previously existing sstable files point to existing
files in the ``cp-1`` directory and
+references to new sstable files point to the newly uploaded files in
directory ``cp-2``. When the checkpoint completes,
+the counts for all referenced files are increased by 1.
+
+For checkpoint ``CP 3``, we see that RocksDBâs compaction has merged
``sstable-(1)``, ``sstable-(2)``, and
+``sstable-(3)`` into ``sstable-(1,2,3)``. This merged table contains the
same net information as the source files and
+eliminates any duplicate entries for each key that might have existed
across the three source files. The source files of
+the merge have been deleted, ``sstable-(4)`` still exists, and one
additional ``sstable-(5)`` was created. For the
+checkpoint, we need to upload the new files ``sstable-(1,2,3)`` and
``sstable-(5)`` and can re-reference ``sstable-(4)``
+from a previous checkpoint. When this checkpoint completes, two things
will happen at the checkpoint coordinator, in the
+following order:
+
+* First, the checkpoint registers the referenced files, increasing the
count of those files by 1.
+
+* Then, the older checkpoint ``CP 1`` will be deleted because we have
configured the number of retained checkpoints to
+two.
+
+* As part of the deletion, the counts for all files referenced by ``CP
1``, (``sstable-(1)`` and ``sstable-(2)``), is
+decreased by 1.
--- End diff --
the counts ... are decreased by 1
---