[
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648283#comment-15648283
]
ASF GitHub Bot commented on FLINK-4975:
---------------------------------------
GitHub user StephanEwen opened a pull request:
https://github.com/apache/flink/pull/2773
[backport] [FLINK-4975] [checkpointing] Add a limit for how much data may
be buffered in alignment
## This is a backport of #2754 to the 1.1.x branch.
## This already incorporates the feedback on #2754
-----
In corner case situations, checkpoint alignment can take very long and
buffer/spill a lot of data. This PR introduces setting a limit to how much data
may be buffered during alignments. If that volume is exceeded, the checkpoint
will abort.
While these overly large alignment situation should not occur in a healthy
environment, it is an important safety net to have.
This Pull Request consists of three parts:
### Introduce Cancellation Barriers
These *Cancellation Barriers* are like checkpoint barriers, flowing with
the data, but signalling that a checkpoint should be aborted rather that the
position of that stream in the checkpoint.
This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that
these Cancellation Barriers are correctly interpreted and interplay well with
other situations of alignment starts and cancellations (such as when newer
barriers come early).
### Adjust and Checkpoint Coordinator
Tasks emit cancellation barriers whenever they cannot start a checkpoint or
whenever a checkpoint alignment was canceled. That lets downstream tasks know
earlier that they should stop the alignment for that checkpoint, because it
will not be able to complete.
Tasks also explicitly send "decline" messages to the checkpoint coordinator
for checkpoints they "skipped" due to alignment being cancelled or superseded.
Previously the assumptions were:
- When a Source Task cannot start a checkpoint, a new checkpoint must be
triggered immediately, to dissolve any started downstream alignments that
otherwise would not be able to complete.
- Whenever an alignment is aborted by a newer checkpoint barrier coming
in, that newer barrier will eventually reach the downstream task and break
outdated pending alignments. The cancellation barrier will not break the
outdated alignment earlier.
### Alignment Size Limit
When the `BarrierBuffer` has buffered more than a certain number of bytes,
it aborts the alignment and signals the Task that the checkpoint was aborted.
The Task sends a cancellation barrier for that checkpoint downstream, to signal
the downstream tasks that they should not wait for a proper barrier.
The maximum alignment size is a config option:
`task.checkpoint.alignment.max-size`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StephanEwen/incubator-flink backport
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2773.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2773
----
commit a1f028dee49928ada014632bb27216b36e30250e
Author: Stephan Ewen <[email protected]>
Date: 2016-10-23T16:41:32Z
[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal
aborted checkpoints
commit b643edf1ace88b34c9cea5e892c440ad114a46fe
Author: Stephan Ewen <[email protected]>
Date: 2016-11-03T14:28:15Z
[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered
in alignment.
If more data than the defined amount is buffered, the alignment is aborted
and the checkpoint canceled.
commit 0d890024299aecd3279d9f033415a206622e0425
Author: Stephan Ewen <[email protected]>
Date: 2016-11-08T16:13:19Z
[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the
Checkpoint Coordinator
----
> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
> Key: FLINK-4975
> URL: https://issues.apache.org/jira/browse/FLINK-4975
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After
> exceeding that limit, the checkpoint alignment should abort and the
> checkpoint be canceled.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)