[
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171683#comment-16171683
]
ASF GitHub Bot commented on FLINK-7449:
---------------------------------------
Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/4543#discussion_r139686793
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
```sh
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full,
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+ RocksDBStateBackend backend =
+ new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can
recover in case of a software or machine
+failure (see [here]({{ site.baseurl
}}/internals/stream_checkpointing.html).
+
+Flink creates checkpoints periodically to track the progress of a job so
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+<p class="text-center">
+ <img alt="Figure 1: Full Checkpoints vs Incremental Checkpoints"
width="80%" src="{{ site.baseurl }}/fig/incremental_cp_basic.svg"/>
+</p>
+
+With incremental checkpointing, each checkpoint contains only the state
change since the previous checkpoint.
+
+* For the first checkpoint ``CP 1``, there is no difference between a full
checkpoint and the complete state at the time the
+checkpoint is written.
+
+* For ``CP 2``, incremental checkpointing will write only the changes
since ``CP 1``: the value for ``K1`` has changed and a mapping
+for ``K3`` was added.
+
+* For checkpoint ``CP 3``, incremental checkpointing only records the
changes since ``CP 2``.
+
+Notice that, unlike in full checkpoints, we also must record changes that
delete state in an incremental checkpoint, as
+in the case of ``K0``. In this simple example, we can see how incremental
checkpointing can reduce the amount of data that
+is written for each checkpoint.
+
+The next interesting question: how does restoring from incremental
checkpoints compare to restoring from full
+checkpoints? Restoring a full checkpoint is as simple as loading all the
data from the checkpoint back into the job
+state because full checkpoints are self-contained. In contrast, to restore
an incremental checkpoint, we need to replay
+the history of all incremental checkpoints that are in the reference chain
of the checkpoint we are trying to restore.
+In our example from Figure 1, those connections are represented by the
orange arrows. If we want to restore ``CP 3``, as a
+first step, we need to apply all the changes of ``CP 1`` to the empty
initial job state. On top of that, we apply the
+changes from ``CP 2``, and then the changes from ``CP 3``.
+
+A different way to think about basic incremental checkpoints is to imagine
it as a changelog with some aggregation. What
+we mean by aggregated is that for example, if the state under key ``K1``
is overwritten multiple times between two
+consecutive checkpoints, we will only record the latest state value at the
time in the checkpoint. All previous changes
+are thereby subsumed.
+
+This leads us to the discussion of the potential *disadvantages* of
incremental checkpoints compared to full checkpoints.
+While we save work in writing checkpoints, we have to do more work in
reading the data from multiple checkpoints on
+recovery. Furthermore, we can no longer simply delete old checkpoints
because new checkpoints rely upon them and the
+history of old checkpoints can grow indefinitely over time (like a
changelog).
+
+At this point, it looks like we didn’t gain too much from incremental
checkpoints because we are again trading between
+checkpointing overhead and recovery time. Fortunately, there are ways to
improve on this naive approach to recovery. One
+simple and obvious way to restrict recovery time and the length of the
checkpoint history is to write a full checkpoint
+from time to time. We can drop all checkpoints prior to the most recent
full checkpoint, and the full checkpoint can
+serve as a new basis for future incremental checkpoints.
+
+Our actual implementation of incremental checkpoints in Flink is more
involved and designed to address a number of
+different tradeoffs. Our incremental checkpointing restricts the size of
the checkpoint history and therefore never
+needs take a full checkpoint to keep recovery efficiently! We present more
detail about this in the next section, but
+the high level idea is to accept a small amount of redundant state writing
to incrementally introduce
+merged/consolidated replacements for previous checkpoints. For now, you
can think about Flink’s approach as stretching
+out and distributing the consolidation work over several incremental
checkpoints, instead of doing it all at once in a
+full checkpoint. Every incremental checkpoint can contribute a share for
consolidation. We also track when old
+checkpoints data becomes obsolete and then prune the checkpoint history
over time.
--- End diff --
when old checkpoint data OR when previously checkpointed data
> Improve and enhance documentation for incremental checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-7449
> URL: https://issues.apache.org/jira/browse/FLINK-7449
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
> Affects Versions: 1.4.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Minor
>
> We should provide more details about incremental checkpoints in the
> documentation.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)