[
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265563#comment-16265563
]
ASF GitHub Bot commented on FLINK-7449:
---------------------------------------
Github user ChrisChinchilla commented on a diff in the pull request:
https://github.com/apache/flink/pull/4543#discussion_r153034462
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
```sh
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full,
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can
recover in case of a software or machine
+failure (see [here]({{ site.baseurl
}}/internals/stream_checkpointing.html).
+
+Flink creates checkpoints periodically to track the progress of a job so
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+<p class="text-center">
+ <img alt="Figure 1: Full Checkpoints vs Incremental Checkpoints"
width="80%" src="{{ site.baseurl }}/fig/incremental_cp_basic.svg"/>
+</p>
+
+With incremental checkpointing, each checkpoint contains only the state
change since the previous checkpoint.
+
+* For the first checkpoint ``CP 1``, there is no difference between a full
checkpoint and the complete state at the time the
+checkpoint is written.
+
+* For ``CP 2``, incremental checkpointing will write only the changes
since ``CP 1``: the value for ``K1`` has changed and a mapping
+for ``K3`` was added.
+
+* For checkpoint ``CP 3``, incremental checkpointing only records the
changes since ``CP 2``.
+
+Notice that, unlike in full checkpoints, we also must record changes that
delete state in an incremental checkpoint, as
+in the case of ``K0``. In this simple example, we can see how incremental
checkpointing can reduce the amount of data that
+is written for each checkpoint.
+
+The next interesting question: how does restoring from incremental
checkpoints compare to restoring from full
+checkpoints? Restoring a full checkpoint is as simple as loading all the
data from the checkpoint back into the job
+state because full checkpoints are self-contained. In contrast, to restore
an incremental checkpoint, we need to replay
+the history of all incremental checkpoints that are in the reference chain
of the checkpoint we are trying to restore.
+In our example from Figure 1, those connections are represented by the
orange arrows. If we want to restore ``CP 3``, as a
+first step, we need to apply all the changes of ``CP 1`` to the empty
initial job state. On top of that, we apply the
+changes from ``CP 2``, and then the changes from ``CP 3``.
+
+A different way to think about basic incremental checkpoints is to imagine
it as a changelog with some aggregation. What
+we mean by aggregated is that for example, if the state under key ``K1``
is overwritten multiple times between two
+consecutive checkpoints, we will only record the latest state value at the
time in the checkpoint. All previous changes
+are thereby subsumed.
+
+This leads us to the discussion of the potential *disadvantages* of
incremental checkpoints compared to full checkpoints.
+While we save work in writing checkpoints, we have to do more work in
reading the data from multiple checkpoints on
+recovery. Furthermore, we can no longer simply delete old checkpoints
because new checkpoints rely upon them and the
+history of old checkpoints can grow indefinitely over time (like a
changelog).
+
+At this point, it looks like we didn’t gain too much from incremental
checkpoints because we are again trading between
+checkpointing overhead and recovery time. Fortunately, there are ways to
improve on this naive approach to recovery. One
+simple and obvious way to restrict recovery time and the length of the
checkpoint history is to write a full checkpoint
+from time to time. We can drop all checkpoints prior to the most recent
full checkpoint, and the full checkpoint can
+serve as a new basis for future incremental checkpoints.
+
+Our actual implementation of incremental checkpoints in Flink is more
involved and designed to address a number of
+different tradeoffs. Our incremental checkpointing restricts the size of
the checkpoint history and therefore never
+needs take a full checkpoint to keep recovery efficiently! We present more
detail about this in the next section, but
--- End diff --
@StefanRRichter Is the whole section…
> We present more detail about this in the next section…
Until the end of the paragraph needed? If you're about to cover it below
anyway, why mention it? Or is there anything here that isn't mentioned in the
> ### Incremental Checkpoints in Flink
section?
> Improve and enhance documentation for incremental checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-7449
> URL: https://issues.apache.org/jira/browse/FLINK-7449
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
> Affects Versions: 1.4.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Minor
>
> We should provide more details about incremental checkpoints in the
> documentation.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)