infoverload commented on a change in pull request #17135:
URL: https://github.com/apache/flink/pull/17135#discussion_r707108617
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -83,6 +83,8 @@ Other parameters for checkpointing include:
- *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref
"docs/ops/state/unaligned_checkpoints" >}}) to greatly reduce checkpointing
times under backpressure. Only works for exactly-once checkpoints and with
number of concurrent checkpoints of 1.
+ - *checkpoints with finished tasks*: You can enable an experimental feature
to continue performing checkpoints even if parts of the DAG have finished
processing all of their records. Before doing so please read through the
[important considerations](#checkpointing-with-parts-of-the-graph-finished).
+
Review comment:
```suggestion
- *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref
"docs/ops/state/unaligned_checkpoints" >}}) to greatly reduce checkpointing
times under backpressure. This only works for exactly-once checkpoints and with
a number of concurrent checkpoints of 1.
- *checkpoints with finished tasks*: You can enable an experimental
feature to continue performing checkpoints even if parts of the DAG have
finished processing all of their records. Before doing so, please read through
some [important
considerations](#checkpointing-with-parts-of-the-graph-finished).
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -211,4 +223,46 @@ Flink currently only provides processing guarantees for
jobs without iterations.
Please note that records in flight in the loop edges (and the state changes
associated with them) will be lost during failure.
+## Checkpointing with parts of the graph finished *(BETA)*
+
+Starting from Flink 1.14 it is possible to continue performing checkpoints
even if parts of the job
+graph have finished processing all data, which might happen if it contains
bounded sources. The
+feature must be enabled via a feature flag:
+
+```java
+Configuration config = new Configuration();
+config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+```
+
+Once tasks/subtasks are finished they don't contribute to the checkpoints any
longer. It is an
+important observation that puts certain requirements on the implementation of
any custom operators
+or UDFs. In order to support checkpointing with tasks that finish we adjusted
the [task lifecycle]({{< ref "docs/internals/task_lifecycle" >}})
+and introduced the {{< javadoc
file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--"
name="StreamOperator#finish" >}}
+method. The method is expected to be a clear cutoff point for flushing any
remaining buffered state.
+All checkpoints taken after the `finish` method has been called should be in
most cases empty and
+shouldn't contain any buffered data, as there will be no way to emit this
data. One notable
+exception is if your operator has some pointers to transactions in external
systems, for example in
+order to implement the exactly-once semantic. In such a case, checkpoints
taken after invoking `finish()`
+method should keep a pointer to the last transaction(s) that will be committed
in the final checkpoint
+before the operator is closed. A good built-in example of this are
exactly-once sinks and the
+`TwoPhaseCommitSinkFunction`. What does it mean in more details?
Review comment:
```suggestion
## Checkpointing with parts of the job graph finished *(BETA)*
Starting from Flink 1.14, it is possible to continue performing checkpoints
even if parts of the job
graph have finished processing all the data. This might happen if there are
bounded sources. This
feature must be enabled via a feature flag:
```java
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
```
Once the tasks/subtasks are finished, they do not contribute to the
checkpoints any longer. This is an
an important consideration when implementing any custom operators or UDFs
(User-Defined Functions).
In order to support checkpointing with tasks that finished, we adjusted the
[task lifecycle]({{< ref "docs/internals/task_lifecycle" >}}) and introduced
the {{< javadoc
file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--"
name="StreamOperator#finish" >}} method. This method is expected to be a clear
cutoff point for flushing any remaining buffered state. All checkpoints taken
after the `finish` method has been called should be empty (in most cases) and
should not contain any buffered data since there will be no way to emit this
data. One notable exception is if your operator has some pointers to
transactions in external systems (i.e. order to implement the exactly-once
semantic). In such a case, checkpoints taken after invoking the `finish()`
method should keep a pointer to the last transaction(s) that will be committed
in the final checkpoint before the operator is closed. A good built-in example
of this are exactly-once sinks and the `TwoPhaseCommitSinkFu
nction`.
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -211,4 +223,46 @@ Flink currently only provides processing guarantees for
jobs without iterations.
Please note that records in flight in the loop edges (and the state changes
associated with them) will be lost during failure.
+## Checkpointing with parts of the graph finished *(BETA)*
+
+Starting from Flink 1.14 it is possible to continue performing checkpoints
even if parts of the job
+graph have finished processing all data, which might happen if it contains
bounded sources. The
+feature must be enabled via a feature flag:
+
+```java
+Configuration config = new Configuration();
+config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+```
+
+Once tasks/subtasks are finished they don't contribute to the checkpoints any
longer. It is an
+important observation that puts certain requirements on the implementation of
any custom operators
+or UDFs. In order to support checkpointing with tasks that finish we adjusted
the [task lifecycle]({{< ref "docs/internals/task_lifecycle" >}})
+and introduced the {{< javadoc
file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--"
name="StreamOperator#finish" >}}
+method. The method is expected to be a clear cutoff point for flushing any
remaining buffered state.
+All checkpoints taken after the `finish` method has been called should be in
most cases empty and
+shouldn't contain any buffered data, as there will be no way to emit this
data. One notable
+exception is if your operator has some pointers to transactions in external
systems, for example in
+order to implement the exactly-once semantic. In such a case, checkpoints
taken after invoking `finish()`
+method should keep a pointer to the last transaction(s) that will be committed
in the final checkpoint
+before the operator is closed. A good built-in example of this are
exactly-once sinks and the
+`TwoPhaseCommitSinkFunction`. What does it mean in more details?
+
+### Operator state
+
+The aforementioned requirement implies certain contract especially on the
operator state. First and
+foremost it forced us to implement a special handling for `UnionListState`.
Often a `UnionListState`
+has been used to implement a global view over offsets in external system (e.g.
we use it to store
+current offsets of Kafka partitions). If we had discarded a state for just a
single subtask that had
+it's `finish` method called, we would've lost offsets for partitions it had
been assigned. In order
+to work this problem around, we let checkpoints succeed, only if either none
or all subtasks
+finished that use `UnionListState`.
+
+We have not seen a `ListState` used in a similar way, but you must be aware
that any state checkpointed
+after the `finish` method will be discarded and not available after a restore
(if the corresponding
+checkpoint succeeds). We truly believe that any operator that is prepared to
be rescaled should work
+well with tasks that partially finish (only a subset of it tasks finish).
Restoring from a checkpoint
+which has some subtasks finished is equivalent to restoring such a task with
the number of new
+subtasks equal to the number of finished tasks.
Review comment:
```suggestion
### How does this impact the operator state?
There is a special handling for `UnionListState`, which has often been used
to implement a global view over offsets in an external system (i.e. storing
current offsets of Kafka partitions). If we had discarded a state for a single
subtask that had its `finish` method called, we would have lost the offsets for
partitions that it had been assigned. In order to work around this problem, we
let checkpoints succeed only if none or all subtasks that use `UnionListState`
are finished.
We have not seen `ListState` used in a similar way, but you should be aware
that any state checkpointed after the `finish` method will be discarded and not
be available after a restore.
Any operator that is prepared to be rescaled should work well with tasks
that partially finish. Restoring from a checkpoint where only a subset of tasks
finished is equivalent to restoring such a task with the number of new subtasks
equal to the number of finished tasks.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]