gaoyunhaii commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-858512129


   Hi Dawid, very thanks for the suggestion! After some more think, now I think 
you are right that we would finally drain the records from all the channels and 
we might handle the triggering the remaining checkpoints directly in the 
`StreamTask` so that we do not need to expose the `BarrierHandler`. 
   
   With the above approach, it seems we would reject the checkpoints if not all 
channels get finished, right ? Perhaps we could change it a bit to holding 
these checkpoints until all the checkpoints get drained ? namely
   
   ```
   private final List<CheckpointBarrier> pendingCheckpoints = new ArrayList();
   
   private boolean triggerCheckpointAsyncInMailbox(
           CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions)
           throws Exception {
       FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
       try {
           if (allInputsFinished()) {
               return triggerCheckpointInRootNode(checkpointMetaData, 
checkpointOptions);
           } else {
               pendingCheckpoints.add(new CheckpointBarrier(checkpointMetaData, 
checkpointOptions));
           }
       } catch (Exception e) {
          ...
       } finally {
           ...
       }
   }
   
   private void executeInvoke() throws Exception {
       runMailboxLoop();
       ensureNotCanceled();
   
       triggerAllPendingCheckpoints();
   
       afterInvoke();
   }
   
   
   private void triggerAllPendingCheckpoints() {
       for (CheckpointBarrier barrier : pendingCheckpoints) {
            triggerCheckpointInRootNode(barrier.getMetaData(), 
barrier.getOptions());
       }
      pendingCheckpoints.clear();
   }
   
   ```
   
   Do you think this would be ok~?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to