pnowojski commented on pull request #14831: URL: https://github.com/apache/flink/pull/14831#issuecomment-774642652
Thanks for the explanation @gaoyunhaii . I've thought about what you had written, and I still have some concerns. > If both A1 and A2 received barrier after EndOfPartition, then task B would only receive EndOfPartitions and would not know any thing about this checkpoint. JM also could not detect this situation since all the checkpoint RPC triggers are success. This would cause checkpoint timeout and then cause job failover. Why RPC triggers would need to be successful? We can modify the logic in `StreamTask`, to fail them as we wish (more about this later). > The second concern is about letting the barriers to "chase after" EndOfPartition for unaligned checkpoint case But here you are not solving this problem. Unaligned Checkpoints in your proposal are not able to overtake the remaining output buffers. You are adding quite a bit more complexity and special handling on the input side to partially support unaligned checkpoints, while in the long term we would still need to keep the upstream task alive. Your solution doesn't bring us closer to that, while it adds extra complexity that won't be needed in the final version, right? For me, modifying the closing protocol, to add some kind of hand shake, would be more generic and cleaner solution. It would not affect alignment/unaligned checkpoints, apart from one minor thing: we might need to persist some events (like `EndOfPartitionEvent`) if they are overtaken by the unaligned `CheckpointBarrier`. As I said at the beginning: adding code/complexity to `StreamTask`/`Task` or even to closing protocol is much much safer compare to altering the behaviour of the alignment (caching logic in `LocalnputChannel` or `#insertBarrierBeforeEndOfPartition`). As I see, the alternate approach would be something like that: 1. introduce a two step handshake when closing channels. First upstream sends `EndOfPartitionEvent`, downstream upon processing it sends `PartitionConsumedEvent`, upstream sends `CloseEvent` 2. downstream should expect some events, like `CheckpointBarrier` to be sent after the `EndOfPartitionEvent`. `CloseEvent` would let the downstream know it can safely release all of the resources. 3. After sending `EndOfPartitionEvent` upstream would be in `FINISHING` state. It can still handle checkpoint RPCs. 4. Upstream after sending `CloseEvent` would switch to `FINISHED` state and reject all checkpoint RPCs. 5. If checkpoint trigger RPC is rejected because task switched to `FINISHED`, `CheckpointCoordinator` should take that into account and redirect the RPC to all tasks downstream from the `FINISHED` one. I assume you need this behaviour also in the current version? @gaoyunhaii what are you doing if there is a race condition and some `Task` finishes while trigger checkpoint RPC is already on its way to it? > Besides, the upstream task need to do a reference count about how much exit events are received from the downstream tasks We are already doing that (`PipelinedResultPartition#numUnconsumedSubpartitions`). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
