infoverload commented on a change in pull request #17135:
URL: https://github.com/apache/flink/pull/17135#discussion_r707108617



##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -83,6 +83,8 @@ Other parameters for checkpointing include:
 
   - *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref 
"docs/ops/state/unaligned_checkpoints" >}}) to greatly reduce checkpointing 
times under backpressure. Only works for exactly-once checkpoints and with 
number of concurrent checkpoints of 1.
 
+  - *checkpoints with finished tasks*: You can enable an experimental feature 
to continue performing checkpoints even if parts of the DAG have finished 
processing all of their records. Before doing so please read through the 
[important considerations](#checkpointing-with-parts-of-the-graph-finished).
+

Review comment:
       ```suggestion
     - *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref 
"docs/ops/state/unaligned_checkpoints" >}}) to greatly reduce checkpointing 
times under backpressure. This only works for exactly-once checkpoints and with 
a number of concurrent checkpoints of 1.
   
     - *checkpoints with finished tasks*: You can enable an experimental 
feature to continue performing checkpoints even if parts of the DAG have 
finished processing all of their records. Before doing so, please read through 
some [important 
considerations](#checkpointing-with-parts-of-the-graph-finished).
   
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -211,4 +223,46 @@ Flink currently only provides processing guarantees for 
jobs without iterations.
 
 Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
 
+## Checkpointing with parts of the graph finished *(BETA)*
+
+Starting from Flink 1.14 it is possible to continue performing checkpoints 
even if parts of the job
+graph have finished processing all data, which might happen if it contains 
bounded sources. The
+feature must be enabled via a feature flag:
+
+```java
+Configuration config = new Configuration();
+config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
 true);
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+```
+
+Once tasks/subtasks are finished they don't contribute to the checkpoints any 
longer. It is an
+important observation that puts certain requirements on the implementation of 
any custom operators
+or UDFs. In order to support checkpointing with tasks that finish we adjusted 
the [task lifecycle]({{< ref "docs/internals/task_lifecycle" >}})
+and introduced the {{< javadoc 
file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--" 
name="StreamOperator#finish" >}}
+method. The method is expected to be a clear cutoff point for flushing any 
remaining buffered state.
+All checkpoints taken after the `finish` method has been called should be in 
most cases empty and
+shouldn't contain any buffered data, as there will be no way to emit this 
data. One notable
+exception is if your operator has some pointers to transactions in external 
systems, for example in
+order to implement the exactly-once semantic. In such a case, checkpoints 
taken after invoking `finish()`
+method should keep a pointer to the last transaction(s) that will be committed 
in the final checkpoint
+before the operator is closed. A good built-in example of this are 
exactly-once sinks and the 
+`TwoPhaseCommitSinkFunction`. What does it mean in more details?

Review comment:
       ```suggestion
   ## Checkpointing with parts of the job graph finished *(BETA)*
   
   Starting from Flink 1.14, it is possible to continue performing checkpoints 
even if parts of the job
   graph have finished processing all the data. This might happen if there are 
bounded sources. This
   feature must be enabled via a feature flag:
   
   ```java
   Configuration config = new Configuration();
   
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
   ```
   
   Once the tasks/subtasks are finished, they do not contribute to the 
checkpoints any longer. This is an
   an important consideration when implementing any custom operators or UDFs 
(User-Defined Functions). 
   
   In order to support checkpointing with tasks that finished, we adjusted the 
[task lifecycle]({{< ref "docs/internals/task_lifecycle" >}}) and introduced 
the {{< javadoc 
file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--" 
name="StreamOperator#finish" >}} method. This method is expected to be a clear 
cutoff point for flushing any remaining buffered state. All checkpoints taken 
after the `finish` method has been called should be empty (in most cases) and 
should not contain any buffered data since there will be no way to emit this 
data. One notable exception is if your operator has some pointers to 
transactions in external systems (i.e. order to implement the exactly-once 
semantic). In such a case, checkpoints taken after invoking the `finish()` 
method should keep a pointer to the last transaction(s) that will be committed 
in the final checkpoint before the operator is closed. A good built-in example 
of this are exactly-once sinks and the `TwoPhaseCommitSinkFu
 nction`. 
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -211,4 +223,46 @@ Flink currently only provides processing guarantees for 
jobs without iterations.
 
 Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
 
+## Checkpointing with parts of the graph finished *(BETA)*
+
+Starting from Flink 1.14 it is possible to continue performing checkpoints 
even if parts of the job
+graph have finished processing all data, which might happen if it contains 
bounded sources. The
+feature must be enabled via a feature flag:
+
+```java
+Configuration config = new Configuration();
+config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
 true);
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+```
+
+Once tasks/subtasks are finished they don't contribute to the checkpoints any 
longer. It is an
+important observation that puts certain requirements on the implementation of 
any custom operators
+or UDFs. In order to support checkpointing with tasks that finish we adjusted 
the [task lifecycle]({{< ref "docs/internals/task_lifecycle" >}})
+and introduced the {{< javadoc 
file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--" 
name="StreamOperator#finish" >}}
+method. The method is expected to be a clear cutoff point for flushing any 
remaining buffered state.
+All checkpoints taken after the `finish` method has been called should be in 
most cases empty and
+shouldn't contain any buffered data, as there will be no way to emit this 
data. One notable
+exception is if your operator has some pointers to transactions in external 
systems, for example in
+order to implement the exactly-once semantic. In such a case, checkpoints 
taken after invoking `finish()`
+method should keep a pointer to the last transaction(s) that will be committed 
in the final checkpoint
+before the operator is closed. A good built-in example of this are 
exactly-once sinks and the 
+`TwoPhaseCommitSinkFunction`. What does it mean in more details?
+
+### Operator state
+
+The aforementioned requirement implies certain contract especially on the 
operator state. First and
+foremost it forced us to implement a special handling for `UnionListState`. 
Often a `UnionListState`
+has been used to implement a global view over offsets in external system (e.g. 
we use it to store
+current offsets of Kafka partitions). If we had discarded a state for just a 
single subtask that had
+it's `finish` method called, we would've lost offsets for partitions it had 
been assigned. In order
+to work this problem around, we let checkpoints succeed, only if either none 
or all subtasks
+finished that use `UnionListState`.
+
+We have not seen a `ListState` used in a similar way, but you must be aware 
that any state checkpointed
+after the `finish` method will be discarded and not available after a restore 
(if the corresponding
+checkpoint succeeds). We truly believe that any operator that is prepared to 
be rescaled should work
+well with tasks that partially finish (only a subset of it tasks finish). 
Restoring from a checkpoint
+which has some subtasks finished is equivalent to restoring such a task with 
the number of new
+subtasks equal to the number of finished tasks.

Review comment:
       ```suggestion
   ### How does this impact the operator state?
   
   There is a special handling for `UnionListState`, which has often been used 
to implement a global view over offsets in an external system (i.e. storing 
current offsets of Kafka partitions). If we had discarded a state for a single 
subtask that had its `finish` method called, we would have lost the offsets for 
partitions that it had been assigned. In order to work around this problem, we 
let checkpoints succeed only if none or all subtasks that use `UnionListState` 
are finished.
   
   We have not seen `ListState` used in a similar way, but you should be aware 
that any state checkpointed after the `finish` method will be discarded and not 
be available after a restore. 
   
   Any operator that is prepared to be rescaled should work well with tasks 
that partially finish. Restoring from a checkpoint where only a subset of tasks 
finished is equivalent to restoring such a task with the number of new subtasks 
equal to the number of finished tasks.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to