dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-859350238
I thought about it quite a lot yesterday. Sorry for going back and forth
with the issue. However, I want to be extra cautious with the checkpointing. I
checked the FLIP again and also discussed with Piotr what would be the best
general approach for handling the triggering in those corner cases.
Let me, recollect what we said in the FLIP. IIRC we will introduce an
additional `EndOfData` event which will be hand-shaken between the upstream and
downstream task before the upstream task goes away, right? This means it is not
possible to get an RPC `triggerCheckpoint` on a downstream task if there is any
data in any of the channels.
Still, there might be barriers incoming through channels that already
reported the `EndOfData` as the upstream tasks might've not gone away. In the
discussion with Piotr, we said it would be better to minimise the number of
declined/delayed checkpoints. Having said that I think your original proposal
could come in handy here.
I'd still do the logic within the `StreamTask`, but I think it's a good idea
to make the `CheckpointBarrierHandler` available here. Then we could handle the
RPC in the following manner:
```
List<?> notFinishedInputs = getNotFinishedInputs() // just a pseud
code, we could probably do it cleaner
if (notFinishedInputs.size() == 0) { // channels already closed, we
won't see any data nor barriers
return triggerCheckpointInRootNode(checkpointMetaData,
checkpointOptions);
} else { // we have seen the EndOfData, but there might be some
barriers arriving
// theoretically we could trigger/processBarrier for just a
single channel, as all channels should've seen the EndOfData and they should
not participate in the alignment, we care only about the triggering, however
for the sake of completeness I'd do it for all the not finished channels
for (InputGate inputGate : notFinishedInputs) {
for (InputChannelInfo channelInfo :
inputGate.getNotFinishedChannels()) {
getCheckpointBarrierHandler().processBarrier(barrier,
channelInfo);
}
}
}
```
Therefore it's very much like your original proposal, just that we do not
introduce the additional Source/Non-source split and we do not need to expose
`isRunning` or `performCheckpoint`.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]