rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r540438232
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController
chooseController(CheckpointBarrier
private boolean canTimeout(CheckpointBarrier barrier) {
return barrier.getCheckpointOptions().isTimeoutable() &&
- barrier.getCheckpointOptions().getAlignmentTimeout() <
(System.currentTimeMillis() - barrier.getTimestamp());
+ barrier.getId() <= lastSeenBarrier &&
+ barrier.getCheckpointOptions().getAlignmentTimeout() *
1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+ }
+
+ private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+ return announcedBarrier.getCheckpointOptions().isTimeoutable()
? System.nanoTime() : Long.MAX_VALUE;
Review comment:
> easier to understand
I agree that it might be true for some users, but not for all. During the
[previous
discussion](https://github.com/apache/flink/pull/13827#discussion_r527794600),
and also the one before, the consensus was that it's **not** easier to
understand. However, we can discuss it again.
(also there are some more technical advantages of "local" timeouts)
> Secondly your proposed change will not work with single input tasks
without active timeouts?
Why, could you explain?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]