This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new e1a7121 [FLINK-23916][docs] Update the documentation how to set
tolerable checkpoint failure number
e1a7121 is described below
commit e1a71219454dde66f2779284f8d398fbaf1e69bb
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Sep 1 15:34:43 2021 +0200
[FLINK-23916][docs] Update the documentation how to set tolerable
checkpoint failure number
---
.../datastream/fault-tolerance/checkpointing.md | 24 ++++++++++++++++------
.../api/environment/CheckpointConfig.java | 9 ++++----
2 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index c1e6297..58b4e0a 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -67,6 +67,10 @@ Other parameters for checkpointing include:
Note that this value also implies that the number of concurrent
checkpoints is *one*.
+ - *tolerable checkpoint failure number*: This defines how many consecutive
checkpoint failures will be tolerated,
+ before the whole job is failed over. The default value is `0`, which means
no checkpoint failures will be tolerated,
+ and the job will fail on first reported checkpoint failure.
+
- *number of concurrent checkpoints*: By default, the system will not
trigger another checkpoint while one is still in progress.
This ensures that the topology does not spend too much time on checkpoints
and not make progress with processing the streams.
It is possible to allow for multiple overlapping checkpoints, which is
interesting for pipelines that have a certain processing delay
@@ -77,8 +81,6 @@ Other parameters for checkpointing include:
- *externalized checkpoints*: You can configure periodic checkpoints to be
persisted externally. Externalized checkpoints write their meta data out to
persistent storage and are *not* automatically cleaned up when the job fails.
This way, you will have a checkpoint around to resume from if your job fails.
There are more details in the [deployment notes on externalized
checkpoints]({{< ref "docs/ops/state/checkpoints" >}}#externalized-checkpoints).
- - *fail/continue task on checkpoint errors*: This determines if a task will
be failed if an error occurs in the execution of the task's checkpoint
procedure. This is the default behaviour. Alternatively, when this is disabled,
the task will simply decline the checkpoint to the checkpoint coordinator and
continue running.
-
- *prefer checkpoint for recovery*: This determines if a job will fallback
to latest checkpoint even when there are more recent savepoints available to
potentially reduce recovery time.
- *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref
"docs/ops/state/checkpoints" >}}#unaligned-checkpoints) to greatly reduce
checkpointing times under backpressure. Only works for exactly-once checkpoints
and with number of concurrent checkpoints of 1.
@@ -102,10 +104,13 @@
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
+// only two consecutive checkpoint failures are tolerated
+env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-// enable externalized checkpoints which are retained
+// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
@@ -135,13 +140,17 @@ env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
-// prevent the tasks from failing if an error happens in their checkpointing,
-// the checkpoint will just be declined.
-env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
+// only two consecutive checkpoint failures are tolerated
+env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+// enable externalized checkpoints which are retained
+// after job cancellation
+env.getCheckpointConfig().enableExternalizedCheckpoints(
+ ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
// enables the experimental unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()
@@ -167,6 +176,9 @@
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
# checkpoints have to complete within one minute, or are discarded
env.get_checkpoint_config().set_checkpoint_timeout(60000)
+# only two consecutive checkpoint failures are tolerated
+env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2)
+
# allow only one checkpoint to be in progress at the same time
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 76abc50..6b0275b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -393,8 +393,8 @@ public class CheckpointConfig implements
java.io.Serializable {
}
/**
- * Get the tolerable checkpoint failure number which used by the
checkpoint failure manager to
- * determine when we need to fail the job.
+ * Get the defined number of consecutive checkpoint failures that will be
tolerated, before the
+ * whole job is failed over.
*
* <p>If the {@link #tolerableCheckpointFailureNumber} has not been
configured, this method
* would return 0 which means the checkpoint failure manager would not
tolerate any declined
@@ -408,8 +408,9 @@ public class CheckpointConfig implements
java.io.Serializable {
}
/**
- * Set the tolerable checkpoint failure number, the default value is 0
that means we do not
- * tolerance any checkpoint failure.
+ * This defines how many consecutive checkpoint failures will be
tolerated, before the whole job
+ * is failed over. The default value is `0`, which means no checkpoint
failures will be
+ * tolerated, and the job will fail on first reported checkpoint failure.
*/
public void setTolerableCheckpointFailureNumber(int
tolerableCheckpointFailureNumber) {
if (tolerableCheckpointFailureNumber < 0) {