pnowojski commented on a change in pull request #12722:
URL: https://github.com/apache/flink/pull/12722#discussion_r443667751



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
##########
@@ -382,7 +382,7 @@ public void setPreferCheckpointForRecovery(boolean 
preferCheckpointForRecovery)
        }
 
        /**
-        * Enables unaligned checkpoints, which greatly reduce checkpointing 
times under backpressure.
+        * Enables unaligned checkpoints, which greatly reduce checkpointing 
times under backpressure (experimental).

Review comment:
       As it's stable on our builds, maybe we could label it more production 
ready?

##########
File path: docs/concepts/stateful-stream-processing.md
##########
@@ -140,6 +140,8 @@ Keep in mind that everything to do with checkpointing can 
be done
 asynchronously. The checkpoint barriers don't travel in lock step and
 operations can asynchronously snapshot their state.
 
+Since Flink 1.11, checkpoints can be taken with or without alignment. In the 
+following, we describe aligned checkpoints first.

Review comment:
       following section?

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to backpressure. Then, checkpointing time becomes mostly
+independent of the end-to-end latency. Be aware unaligned checkpointing
+adds to I/O to the state backends, so you shouldn't use it when the I/O to
+the state backend is actually the bottleneck during checkpointing.
+
+We flagged unaligned checkpoints as experimental as it currently has the
+following limitations:
+
+- You cannot rescale from unaligned checkpoints. You have to take a savepoint 
+before rescaling. Savepoints are always aligned independent of the alignment
+setting of checkpoints.
+- Flink currently does not support concurrent unaligned checkpoints. However, 
+due to the more predictable and shorter checkpointing times, concurrent 
+checkpoints might not be needed at all.
+- Unaligned checkpoints may produce incorrect results for the following 
reasons:
+
+Currently, Flink generates the watermark as a first step of recovery instead 
of 
+storing the latest watermark in the operators to ease rescaling. In unaligned 
+checkpoints, that means on recovery, **Flink generates watermarks after it 
+restores in-flight data**. If your pipeline uses an **operator that applies the
+latest watermark on each record**, it will produce **incorrect results** 
during 
+recovery if the watermark is not directly or indirectly part of the operator 
+state. Thus, **SQL OVER operator should not be used with unaligned
+checkpoints**, while window operators are safe to use. The workaround is to
+store the watermark in the operator state. If rescaling may occur, watermarks
+should be stored per key-group in a union-state. We mostly likely will
+implement this approach as a general solution (didn't make it into Flink 
+1.11.0).

Review comment:
       I think this paragraph is a bit too strong. As far as I understand, it's 
not that the UC will produce incorrect result, just that some records during 
the reprocessing might not be accounted as late data, right?

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to backpressure. Then, checkpointing time becomes mostly
+independent of the end-to-end latency. Be aware unaligned checkpointing
+adds to I/O to the state backends, so you shouldn't use it when the I/O to
+the state backend is actually the bottleneck during checkpointing.
+
+We flagged unaligned checkpoints as experimental as it currently has the
+following limitations:

Review comment:
       I would also mention that `flatMap` operators can lead to unbounded 
spilled data.

##########
File path: docs/concepts/stateful-stream-processing.md
##########
@@ -242,6 +246,48 @@ updates to that state.
 See [Restart Strategies]({% link dev/task_failure_recovery.md
 %}#restart-strategies) for more information.
 
+### Unaligned Checkpointing
+
+Starting with Flink 1.11, checkpointing can also be performed unaligned.
+The basic idea is that checkpoints can overtake all in-flight data as long as 
+the in-flight data becomes part of the operator state.
+
+Note that this approach is actually closer to the [Chandy-Lamport algorithm
+](http://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf), but
+Flink still inserts the barrier in the sources to avoid overloading the
+checkpoint coordinator.
+
+<div style="text-align: center">
+  <img src="{% link fig/stream_unaligning.svg %}" alt="Unaligned 
checkpointing" style="width:100%; padding-top:10px; padding-bottom:10px;" />
+</div>
+
+The figure depicts how an operator handles unaligned checkpoint barriers:
+
+- The operator reacts on the first barrier that is stored in its input buffers.
+- It immediately forwards the barrier to the downstream operator by adding it 
+  to the end of the output buffers.
+- The operator marks all overtaken records to be stored asynchronously and 
+  creates a snapshot of its own state.
+ 
+Consequently, the operator only briefly stops the processing of input to mark
+the buffers, forwards the barrier, and creates the snapshot of the other state.
+  
+Unaligned checkpointing ensures that barriers are arriving at the sink as fast 
+as possible. It's especially suited for applications with at least one slow 
+moving data path, where alignment times can reach hours. However, since it's
+adding additional I/O pressure to state backends, it doesn't help when the I/O

Review comment:
       `I/O pressure to state backends` -> ` I/O pressure`, as it's not using 
state backends per se.

##########
File path: docs/ops/state/checkpoints.md
##########
@@ -113,4 +113,50 @@ above).
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 {% endhighlight %}
 
+### Unaligned checkpoints
+
+Starting with Flink 1.11, checkpoints can be unaligned (experimental). 
+[Unaligned checkpoints]({% link concepts/stateful-stream-processing.md
+%}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
+buffers) as part of the checkpoint state, which allows checkpoint barriers to
+overtake these buffers. Thus, the checkpoint duration becomes independent of 
the
+current throughput as checkpoint barriers are effectively not embedded into 
+the stream of data anymore.
+
+You should use unaligned checkpoints if your checkpointing durations are very
+high due to backpressure. Then, checkpointing time becomes mostly
+independent of the end-to-end latency. Be aware unaligned checkpointing
+adds to I/O to the state backends, so you shouldn't use it when the I/O to
+the state backend is actually the bottleneck during checkpointing.
+
+We flagged unaligned checkpoints as experimental as it currently has the
+following limitations:
+
+- You cannot rescale from unaligned checkpoints. You have to take a savepoint 
+before rescaling. Savepoints are always aligned independent of the alignment
+setting of checkpoints.
+- Flink currently does not support concurrent unaligned checkpoints. However, 
+due to the more predictable and shorter checkpointing times, concurrent 
+checkpoints might not be needed at all.

Review comment:
       We should mention here that aligned savepoints also can not happen 
concurrently to unaligned checkpoint




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to