rkhachatryan commented on a change in pull request #13827:
URL: https://github.com/apache/flink/pull/13827#discussion_r527794600
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
##########
@@ -235,6 +257,23 @@ int getNumOpenChannels() {
return numOpenChannels;
}
+ private CheckpointBarrier maybeTimeout(CheckpointBarrier barrier) {
+ CheckpointOptions options = barrier.getCheckpointOptions();
+ boolean shouldTimeout = (options.isTimeoutable()) && (
+ barrier.getId() == timeoutedBarrierId ||
+ (System.currentTimeMillis() - barrier.getTimestamp()) >
options.getAlignmentTimeout());
Review comment:
After a discussion with @NicoK, @sjwiesman and @alpinegizmo we decided
to:
1. Decide to timeout based on the alignment start time
1. By default, propagate this decision downstream; provide an option to
disable propagation
1. In the UI, show checkpoint type for each subtask; on a checkpoint level
display unaligned if at least one subtask did UC
1. Consider renaming `alignment timeout` option to `subtask alignment
timeout`
Considerations:
- the overhead of UC (persisting channels) should ideally be localized
- the less global the decision is, the more difficult it might be to debug
UC-related issues
- In a common scenario, backpressure comes from sinks; buffers will be full,
so disabling propagation doesn't make a difference
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]