dawidwys commented on a change in pull request #16135:
URL: https://github.com/apache/flink/pull/16135#discussion_r656363060



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
##########
@@ -47,7 +48,8 @@
         FORCED_ALIGNED
     }
 
-    public static final long NO_ALIGNMENT_TIME_OUT = Long.MAX_VALUE;
+    /** The maximum possible value is Long.MAX_VALUE 'ns' but API receives 
'ms'. */
+    public static final long NO_ALIGNMENT_TIME_OUT = 
Duration.ofNanos(Long.MAX_VALUE).toMillis();

Review comment:
       Hmm, ok I understand the problem, but then I think the solution is 
actually wrong. The `NO_ALIGNEMNT_TIME_OUT` means there should be no alignment 
timeout, therefore we should not register the timer.
   
   We should rather add a condition in the `checkNewCheckpoint`:
   
   ```
      private void checkNewCheckpoint(CheckpointBarrier barrier) throws 
IOException {
           long barrierId = barrier.getId();
           if (currentCheckpointId < barrierId) {
               if (isCheckpointPending()) {
                   cancelSubsumedCheckpoint(barrierId);
               }
               currentCheckpointId = barrierId;
               numBarriersReceived = 0;
               allBarriersReceivedFuture = new CompletableFuture<>();
               return true;
           if (currentCheckpointId >= barrierId) {
               return; // This barrier is not the first for this checkpoint.
           }
   
           if (isCheckpointPending()) {
               cancelSubsumedCheckpoint(barrierId);
           }
           currentCheckpointId = barrierId;
           numBarriersReceived = 0;
           allBarriersReceivedFuture = new CompletableFuture<>();
           firstBarrierArrivalTime = getClock().relativeTimeNanos();
   
           if (alternating && barrier.getCheckpointOptions().isTimeoutable()) {
               registerAlignmentTimer(barrier);
           }
           return false;
       }
   ```
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to