dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-859350238


   I thought about it quite a lot yesterday. Sorry for going back and forth 
with the issue. However, I want to be extra cautious with the checkpointing. I 
checked the FLIP again and also discussed with Piotr what would be the best 
general approach for handling the triggering in those corner cases.
   
   Let me, recollect what we said in the FLIP. IIRC we will introduce an 
additional `EndOfData` event which will be hand-shaken between the upstream and 
downstream task before the upstream task goes away, right? This means it is not 
possible to get an RPC `triggerCheckpoint` on a downstream task if there is any 
data in any of the channels. 
   
   Still, there might be barriers incoming through channels that already 
reported the `EndOfData` as the upstream tasks might've not gone away. In the 
discussion with Piotr, we said it would be better to minimise the number of 
declined/delayed checkpoints. Having said that I think your original proposal 
could come in handy here. 
   
   I'd still do the logic within the `StreamTask`, but I think it's a good idea 
to make the `CheckpointBarrierHandler` available here. Then we could handle the 
RPC in the following manner:
   
   ```
           List<?> notFinishedInputs = getNotFinishedInputs() // just a pseud 
code, we could probably do it cleaner
           if (notFinishedInputs.size() == 0) { // channels already closed, we 
won't see any data nor barriers
               return triggerCheckpointInRootNode(checkpointMetaData, 
checkpointOptions);
           } else { // we have seen the EndOfData, but there might be some 
barriers arriving
   
               // theoretically we could trigger/processBarrier for just a 
single channel, as all channels should've seen the EndOfData and they should 
not participate in the alignment, we care only about the triggering, however 
for the sake of completeness I'd do it for all the not finished channels
   
               for (InputGate inputGate : notFinishedInputs) {
                   for (InputChannelInfo channelInfo : 
inputGate.getNotFinishedChannels()) {
                        getCheckpointBarrierHandler().processBarrier(barrier, 
channelInfo);
                   }
               }
           }
   ```
   
   Therefore it's very much like your original proposal, just that we do not 
introduce the additional Source/Non-source split and we do not need to expose 
`isRunning` or `performCheckpoint`.
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to