kezhuw commented on pull request #14831: URL: https://github.com/apache/flink/pull/14831#issuecomment-782599993
@gaoyunhaii @pnowojski I have some immature and unverified thoughts. I think handling of `EndOfPartitionEvent` during checkpoint and overtaking buffers before `EndOfPartitionEvent` are kind of orthogonal and could be coexist. We will encounter `EndOfPartitionEvent` during checkpoint regardless, canceling checkpoint in this case should be avoided after FLIP-147. But instead of "Insert barriers for channels received EndOfPartition", I would suggest to count `EndOfPartitionEvent` directly in checkpoint barrier handler. That is, in implementation, we could count/trigger checkpoint in `CheckpointBarrierHandler.processEndOfPartition` and `ChannelStatePersister.checkForBarrier` if there is pending checkpoint. Personally, I think it is same as `FinalizeBarrierComplementPrcoessor` but much straightforward. The point is if we decide to support `EndOfPartitionEvent` during checkpoint, we should burn this knowledge directly to `CheckpointBarrierHandler`. To overtake buffers before `EndOfPartitionEvent` for unaligned checkpoint, I think the minimal requirement is flushing all buffers to network stack before `EndOfPartitionEvent`. An asynchronous completable flush operation on `PipelinedSubpartition` should meet this requirement. Before that flush operation completed, unaligned checkpoint could take place as normal, after that there will be no output buffers to overtake. Also, an request-response paired events(eg. `EndOfXyzEvent`, `XyzConsumedEvent`) will fulfill this requirement. I am not sure how it is viable to introduce `FINISHING` for checkpointing as I think `ExecutionState` is tackled by `Task` while checkpointing is tackled inside `StreamTask`. But I think this "how to overtake buffers for unaligned checkpoint before `EndOfPartitionEvent`" could be a separated issue. Besides this, I think checkpoint after `EndOfInputEvent`(eg. at `FINISHING` or all buffer flushed) is similar to last checkpoint for 2pc. I am kind of worry about buffer duplication after recovery a successful checkpoint created in that period. Checkpoint in that period will include last buffer from possible end-of-stream-flush operation, after recovery, that end-of-stream-flush will still be executed ? I saw there are other jiras in this pr or planed pr, I want to writes my thoughts also: * FLINK-21085: I think the currently solution will undermine what we try to follow in FLINK-21133. In that jira, we try to unify handling of stop-with-savepoint in one place, while the currently approach tends to duplicate checkpoint trigger in many code paths. I would like to suggest to enhance and generalize `MultipleInputStreamTask.triggerCheckpointAsync` to `StreamTask`. This way we will have only place to trigger checkpoint. * FLINK-21081: Just yet another approach to list/consider/evaluation. May be we could send checkpoint trigger rpc to all running tasks ? If tasks with active inputs receive checkpoint trigger rpc, just bookkeeping it. If all active inputs are `EndOfPartitionEvent` in the meantime, task will trigger checkpoint themselves. If checkcpoint-trigger is coded in one place, this bookkeeping and lazy-trigger will only be coded once in one place. But this approach may be network-consuming in large job topology. @pnowojski I am no sure how finishing handshake will be delivered, so basically, I am also not sure how this handshaking will combine with stop-with-savepoint. But if the handshaking happens at `StreamTask` level, I think there will be no big deal. Though, still unsure. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
