dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-857734904


   All right, I will need to think about it a bit more, but even assuming we 
can receive an RPC signal while the input channels are still 
active/non-finished I feel we can make it easier. I don't think there is a big 
enough difference between source and non-source tasks to justify extracting a 
separate hierarchy. The way I see it is that the only difference is if all the 
input gates has finished or not. (If there are no input gates all of them are 
obviously finished ;) ). In that scenario we could extend the 
`triggerCheckpointAsyncInMailbox` (it was renamed on master) to handle both 
cases quite easily.
   
   I think we don't need to expose the `SingleCheckpointBarrierHandler` in any 
way. Either with my approach or yours you would need to inject barriers on the 
RPC call into the channels/or drain the channels somehow, right? Otherwise you 
cannot checkpoint the data/align with the data in the channels properly. Or am 
I missing something? Therefore you could inject the barrier into channel/drain 
the channel from the `StreamTask` level and the logic in 
`SingleCheckpointBarrierHandler` would still be valid.
   
   Let me know what you think.
   
   ```
       @Override
       public Future<Boolean> triggerCheckpointAsync(
               CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
   
           CompletableFuture<Boolean> result = new CompletableFuture<>();
           mainMailboxExecutor.execute(
                   () -> {
                       try {
                           result.complete(
                                   triggerCheckpointAsyncInMailbox(
                                           checkpointMetaData, 
checkpointOptions));
                       } catch (Exception ex) {
                           // Report the failure both via the Future result but 
also to the mailbox
                           result.completeExceptionally(ex);
                           throw ex;
                       }
                   },
                   "checkpoint %s with %s",
                   checkpointMetaData,
                   checkpointOptions);
           return result;
       }
   
       private boolean triggerCheckpointAsyncInMailbox(
               CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions)
               throws Exception {
           FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
           try {
               if (allInputsFinished()) {
                   return triggerCheckpointInRootNode(checkpointMetaData, 
checkpointOptions);
               } else {
                   throw new UnsupportedOperationException(
                           "We do not support triggering non root nodes yet.");
               }
           } catch (Exception e) {
               // propagate exceptions only if the task is still in "running" 
state
               if (isRunning) {
                   throw new Exception(
                           "Could not perform checkpoint "
                                   + checkpointMetaData.getCheckpointId()
                                   + " for operator "
                                   + getName()
                                   + '.',
                           e);
               } else {
                   LOG.debug(
                           "Could not perform checkpoint {} for operator {} 
while the "
                                   + "invokable was not in state running.",
                           checkpointMetaData.getCheckpointId(),
                           getName(),
                           e);
                   return false;
               }
           } finally {
               FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
           }
       }
   
       private boolean allInputsFinished() {
           return getEnvironment().getAllInputGates().length == 0
                   || Arrays.stream(getEnvironment().getAllInputGates())
                           .allMatch(InputGate::isFinished);
       }
   
       private boolean triggerCheckpointInRootNode(
               CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions)
               throws Exception {
           latestAsyncCheckpointStartDelayNanos =
                   1_000_000
                           * Math.max(
                                   0, System.currentTimeMillis() - 
checkpointMetaData.getTimestamp());
   
           // No alignment if we inject a checkpoint
           CheckpointMetricsBuilder checkpointMetrics =
                   new CheckpointMetricsBuilder()
                           .setAlignmentDurationNanos(0L)
                           .setBytesProcessedDuringAlignment(0L)
                           
.setCheckpointStartDelayNanos(latestAsyncCheckpointStartDelayNanos);
   
           subtaskCheckpointCoordinator.initInputsCheckpoint(
                   checkpointMetaData.getCheckpointId(), checkpointOptions);
   
           boolean success =
                   performCheckpoint(checkpointMetaData, checkpointOptions, 
checkpointMetrics);
           if (!success) {
               declineCheckpoint(checkpointMetaData.getCheckpointId());
           }
           return success;
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to