rkhachatryan commented on a change in pull request #13827:
URL: https://github.com/apache/flink/pull/13827#discussion_r527794600



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
##########
@@ -235,6 +257,23 @@ int getNumOpenChannels() {
                return numOpenChannels;
        }
 
+       private CheckpointBarrier maybeTimeout(CheckpointBarrier barrier) {
+               CheckpointOptions options = barrier.getCheckpointOptions();
+               boolean shouldTimeout = (options.isTimeoutable()) && (
+                       barrier.getId() == timeoutedBarrierId ||
+                       (System.currentTimeMillis() - barrier.getTimestamp()) > 
options.getAlignmentTimeout());

Review comment:
       After a discussion with @NicoK, @sjwiesman and @alpinegizmo we decided 
to:
   1. Decide to timeout based on the alignment start time
   1. By default, propagate this decision downstream; provide an option to 
disable propagation
   1. In the UI, show checkpoint type for each subtask; on a checkpoint level 
display unaligned if at least one subtask did UC
   1. Consider renaming `alignment timeout` option to  `subtask alignment 
timeout` 
   
   Considerations:
   - the overhead of UC (persisting channels) should ideally be localized
   - the less global the decision is, the more difficult it might be to debug 
UC-related issues
   - In a common scenario, backpressure comes from sinks; buffers will be full, 
so disabling propagation doesn't make a difference
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to