kezhuw commented on pull request #14831:
URL: https://github.com/apache/flink/pull/14831#issuecomment-782599993


   @gaoyunhaii @pnowojski I have some immature and unverified thoughts.
   
   I think handling of `EndOfPartitionEvent` during checkpoint and overtaking 
buffers before `EndOfPartitionEvent` are kind of orthogonal and could be 
coexist. We will encounter `EndOfPartitionEvent` during checkpoint regardless, 
canceling checkpoint in this case should be avoided after FLIP-147.
   
   But instead of "Insert barriers for channels received EndOfPartition", I 
would suggest to count `EndOfPartitionEvent` directly in checkpoint barrier 
handler. That is, in implementation, we could count/trigger checkpoint in 
`CheckpointBarrierHandler.processEndOfPartition` and 
`ChannelStatePersister.checkForBarrier` if there is pending checkpoint. 
Personally, I think it is same as `FinalizeBarrierComplementPrcoessor` but much 
straightforward. The point is if we decide to support `EndOfPartitionEvent` 
during checkpoint, we should burn this knowledge directly to 
`CheckpointBarrierHandler`.
   
   To overtake buffers before `EndOfPartitionEvent` for unaligned checkpoint, I 
think the minimal requirement is flushing all buffers to network stack before 
`EndOfPartitionEvent`. An asynchronous completable flush operation on 
`PipelinedSubpartition` should meet this requirement. Before that flush 
operation completed, unaligned checkpoint could take place as normal, after 
that there will be no output buffers to overtake. Also, an request-response 
paired events(eg. `EndOfXyzEvent`, `XyzConsumedEvent`) will fulfill this 
requirement. I am not sure how it is viable to introduce `FINISHING` for 
checkpointing as I think `ExecutionState` is tackled by `Task` while 
checkpointing is tackled inside `StreamTask`. But I think this "how to overtake 
buffers for unaligned checkpoint before `EndOfPartitionEvent`" could be a 
separated issue.
   
   Besides this, I think checkpoint after `EndOfInputEvent`(eg. at `FINISHING` 
or all buffer flushed) is similar to last checkpoint for 2pc. I am kind of 
worry about buffer duplication after recovery a successful checkpoint created 
in that period. Checkpoint in that period will include last buffer from 
possible end-of-stream-flush operation, after recovery, that 
end-of-stream-flush will still be executed ?
   
   I saw there are other jiras in this pr or planed pr, I want to writes my 
thoughts also:
   * FLINK-21085: I think the currently solution will undermine what we try to 
follow in FLINK-21133. In that jira, we try to unify handling of 
stop-with-savepoint in one place, while the currently approach tends to 
duplicate checkpoint trigger in many code paths. I would like to suggest to 
enhance and generalize `MultipleInputStreamTask.triggerCheckpointAsync` to 
`StreamTask`.  This way we will have only place to trigger checkpoint.
   * FLINK-21081:  Just yet another approach to list/consider/evaluation. May 
be we could send checkpoint trigger rpc to all running tasks ? If tasks with 
active inputs receive checkpoint trigger rpc, just bookkeeping it. If all 
active inputs are `EndOfPartitionEvent` in the meantime, task will trigger 
checkpoint themselves. If checkcpoint-trigger is coded in one place, this 
bookkeeping and lazy-trigger will only be coded once in one place. But this 
approach may be network-consuming in large job topology.
   
   @pnowojski I am no sure how finishing handshake will be delivered, so 
basically, I am also not sure how this handshaking will combine with 
stop-with-savepoint. But if the handshaking happens at `StreamTask` level, I 
think there will be no big deal. Though, still unsure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to