morsapaes commented on a change in pull request #12722:
URL: https://github.com/apache/flink/pull/12722#discussion_r443372253



##########
File path: docs/concepts/stateful-stream-processing.md
##########
@@ -123,7 +123,7 @@ provided by Flink's connectors.
 snapshots, we use the words *snapshot* and *checkpoint* interchangeably. Often
 we also use the term *snapshot* to mean either *checkpoint* or *savepoint*.
 
-### Checkpointing
+### Aligned Checkpointing

Review comment:
       Hum, not sure I'd change this title, it might be more confusing than 
helpful. The sentence you have below is clarifying enough ("Since Flink 1.11, 
checkpoints can be taken with or without alignment. In the 
   following, we describe aligned checkpoints first.").

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to back-pressure. Then, checkpointing time becomes mostly

Review comment:
       ```suggestion
   high due to backpressure. Then, checkpointing time becomes mostly
   ```

##########
File path: docs/concepts/stateful-stream-processing.md
##########
@@ -242,6 +246,42 @@ updates to that state.
 See [Restart Strategies]({% link dev/task_failure_recovery.md
 %}#restart-strategies) for more information.
 
+### Unaligned Checkpointing
+
+Starting with Flink 1.11, checkpointing can also be performed unaligned.
+The basic idea is that checkpoints can overtake all in-flight data as long as 
+the in-flight data becomes part of the operator state.
+
+<div style="text-align: center">
+  <img src="{% link fig/stream_unaligning.svg %}" alt="Unaligned 
checkpointing" style="width:100%; padding-top:10px; padding-bottom:10px;" />
+</div>
+
+The figure depicts how an operator handle unaligned checkpoint barriers:
+
+- The operator reacts on the first barrier that is stored in input buffers.
+- It immediately forwards the barrier to the downstream operator by adding it 
+  to the end of the output buffers.
+- The operator marks all overtaken buffers, which are asynchronously stored in
+  the state backend together with the other operator state.
+- It only briefly stops the processing of input to mark the buffers, forward 
+  the barrier and create the snapshot of the other state.
+  
+Unaligned checkpointing ensures that barriers are arriving at the sink as fast 
+as possible. It's especially suited for application with at least one slow 

Review comment:
       ```suggestion
   as possible. It's especially suited for applications with at least one slow 
   ```

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to back-pressure. Then, checkpointing time becomes mostly

Review comment:
       Just for the sake of coherence.

##########
File path: docs/concepts/stateful-stream-processing.md
##########
@@ -242,6 +246,42 @@ updates to that state.
 See [Restart Strategies]({% link dev/task_failure_recovery.md
 %}#restart-strategies) for more information.
 
+### Unaligned Checkpointing
+
+Starting with Flink 1.11, checkpointing can also be performed unaligned.
+The basic idea is that checkpoints can overtake all in-flight data as long as 
+the in-flight data becomes part of the operator state.
+
+<div style="text-align: center">
+  <img src="{% link fig/stream_unaligning.svg %}" alt="Unaligned 
checkpointing" style="width:100%; padding-top:10px; padding-bottom:10px;" />
+</div>
+
+The figure depicts how an operator handle unaligned checkpoint barriers:

Review comment:
       ```suggestion
   The figure depicts how an operator handles unaligned checkpoint barriers:
   ```

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to back-pressure. Then, checkpointing time becomes mostly
+independent of the end-to-end latency. Be aware unaligned checkpointing
+adds to I/O to the state backends, so you shouldn't use it when the I/O to
+the state backend is actually the bottleneck during checkpointing.
+
+We flagged unaligned checkpoints as experimental as it currently has the
+following shortcomings:
+
+- You cannot rescale from unaligned checkpoints. You have to take a savepoint 
+before rescaling. Savepoints are always aligned independent of the alignment
+setting of checkpoints.
+- Flink currently does not support concurrent unaligned checkpoints. However, 
+due to the more predictable and shorter checkpointing times, concurrent 
+checkpoints might not be needed at all.
+- Unaligned checkpoints may produce incorrect results for the following 
reasons.

Review comment:
       ```suggestion
   - Unaligned checkpoints may produce incorrect results for the following 
reasons:
   ```

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to back-pressure. Then, checkpointing time becomes mostly
+independent of the end-to-end latency. Be aware unaligned checkpointing
+adds to I/O to the state backends, so you shouldn't use it when the I/O to
+the state backend is actually the bottleneck during checkpointing.
+
+We flagged unaligned checkpoints as experimental as it currently has the
+following shortcomings:
+
+- You cannot rescale from unaligned checkpoints. You have to take a savepoint 
+before rescaling. Savepoints are always aligned independent of the alignment
+setting of checkpoints.
+- Flink currently does not support concurrent unaligned checkpoints. However, 
+due to the more predictable and shorter checkpointing times, concurrent 
+checkpoints might not be needed at all.
+- Unaligned checkpoints may produce incorrect results for the following 
reasons.
+
+Currently, Flink generates the watermark as a first step of recovery instead 
of 
+storing the latest watermark in the operators to ease rescaling. In unaligned 
+checkpoints, that means on recovery, **Flink generates watermarks after it 
+restores in-flight data**. If your pipeline uses an **operator that applies the
+latest watermark on each record**, it will produce **incorrect results** 
during 
+recovery if the watermark is not directly or indirectly part of the operator 
+state. Thus, **SQL OVER operator should not be used with unaligned
+checkpoints**, while window operators are safe to use. The work-around is to

Review comment:
       ```suggestion
   checkpoints**, while window operators are safe to use. The workaround is to
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to