This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new e1a7121  [FLINK-23916][docs] Update the documentation how to set 
tolerable checkpoint failure number
e1a7121 is described below

commit e1a71219454dde66f2779284f8d398fbaf1e69bb
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Sep 1 15:34:43 2021 +0200

    [FLINK-23916][docs] Update the documentation how to set tolerable 
checkpoint failure number
---
 .../datastream/fault-tolerance/checkpointing.md    | 24 ++++++++++++++++------
 .../api/environment/CheckpointConfig.java          |  9 ++++----
 2 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md 
b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index c1e6297..58b4e0a 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -67,6 +67,10 @@ Other parameters for checkpointing include:
 
     Note that this value also implies that the number of concurrent 
checkpoints is *one*.
 
+  - *tolerable checkpoint failure number*: This defines how many consecutive 
checkpoint failures will be tolerated,
+    before the whole job is failed over. The default value is `0`, which means 
no checkpoint failures will be tolerated,
+    and the job will fail on first reported checkpoint failure.
+
   - *number of concurrent checkpoints*: By default, the system will not 
trigger another checkpoint while one is still in progress.
     This ensures that the topology does not spend too much time on checkpoints 
and not make progress with processing the streams.
     It is possible to allow for multiple overlapping checkpoints, which is 
interesting for pipelines that have a certain processing delay
@@ -77,8 +81,6 @@ Other parameters for checkpointing include:
 
   - *externalized checkpoints*: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are *not* automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the [deployment notes on externalized 
checkpoints]({{< ref "docs/ops/state/checkpoints" >}}#externalized-checkpoints).
 
-  - *fail/continue task on checkpoint errors*: This determines if a task will 
be failed if an error occurs in the execution of the task's checkpoint 
procedure. This is the default behaviour. Alternatively, when this is disabled, 
the task will simply decline the checkpoint to the checkpoint coordinator and 
continue running.
-
   - *prefer checkpoint for recovery*: This determines if a job will fallback 
to latest checkpoint even when there are more recent savepoints available to 
potentially reduce recovery time.
 
   - *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref 
"docs/ops/state/checkpoints" >}}#unaligned-checkpoints) to greatly reduce 
checkpointing times under backpressure. Only works for exactly-once checkpoints 
and with number of concurrent checkpoints of 1.
@@ -102,10 +104,13 @@ 
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 // checkpoints have to complete within one minute, or are discarded
 env.getCheckpointConfig().setCheckpointTimeout(60000);
 
+// only two consecutive checkpoint failures are tolerated
+env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
+
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 
-// enable externalized checkpoints which are retained 
+// enable externalized checkpoints which are retained
 // after job cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(
     ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
@@ -135,13 +140,17 @@ env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
 // checkpoints have to complete within one minute, or are discarded
 env.getCheckpointConfig.setCheckpointTimeout(60000)
 
-// prevent the tasks from failing if an error happens in their checkpointing,
-// the checkpoint will just be declined.
-env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
+// only two consecutive checkpoint failures are tolerated
+env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)
 
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 
+// enable externalized checkpoints which are retained 
+// after job cancellation
+env.getCheckpointConfig().enableExternalizedCheckpoints(
+    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
 // enables the experimental unaligned checkpoints
 env.getCheckpointConfig.enableUnalignedCheckpoints()
 
@@ -167,6 +176,9 @@ 
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
 # checkpoints have to complete within one minute, or are discarded
 env.get_checkpoint_config().set_checkpoint_timeout(60000)
 
+# only two consecutive checkpoint failures are tolerated
+env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2)
+
 # allow only one checkpoint to be in progress at the same time
 env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 76abc50..6b0275b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -393,8 +393,8 @@ public class CheckpointConfig implements 
java.io.Serializable {
     }
 
     /**
-     * Get the tolerable checkpoint failure number which used by the 
checkpoint failure manager to
-     * determine when we need to fail the job.
+     * Get the defined number of consecutive checkpoint failures that will be 
tolerated, before the
+     * whole job is failed over.
      *
      * <p>If the {@link #tolerableCheckpointFailureNumber} has not been 
configured, this method
      * would return 0 which means the checkpoint failure manager would not 
tolerate any declined
@@ -408,8 +408,9 @@ public class CheckpointConfig implements 
java.io.Serializable {
     }
 
     /**
-     * Set the tolerable checkpoint failure number, the default value is 0 
that means we do not
-     * tolerance any checkpoint failure.
+     * This defines how many consecutive checkpoint failures will be 
tolerated, before the whole job
+     * is failed over. The default value is `0`, which means no checkpoint 
failures will be
+     * tolerated, and the job will fail on first reported checkpoint failure.
      */
     public void setTolerableCheckpointFailureNumber(int 
tolerableCheckpointFailureNumber) {
         if (tolerableCheckpointFailureNumber < 0) {

Reply via email to