gaoyunhaii commented on pull request #14831: URL: https://github.com/apache/flink/pull/14831#issuecomment-774166223
Hi @pnowojski , very thanks for the review and deep thoughts! I first tried to have a consideration for the general concerns. For the first issue, previously Roman and I also have some discussion, we think that we might solve this issue via making the upstream tasks to wait till all the buffers are flushed out of the sub-partitions before get to finished. Then during this period, the upstream task would be still running and we could then do unaligned checkpoint normally. For holding the upstream tasks till all the remaining data is consumed, I also think it is a good direction that might simplify the issue. But I still found it might not easy to implement in some detailed points. For convenience suppose we have one task B with two upstream tasks A1 and A2. The first concern is about the upstream tasks ignores the barriers to the downstream tasks after `EndOfPartition` event in the aligned checkpoint case: 1. If both A1 and A2 received barrier after EndOfPartition, then task B would only receive EndOfPartitions and would not know any thing about this checkpoint. JM also could not detect this situation since all the checkpoint RPC triggers are success. This would cause checkpoint timeout and then cause job failover. The second concern is about letting the barriers to "chase after" EndOfPartition for unaligned checkpoint case: 1. Since EndOfPartition is sent in `Task` instead of `StreamTask`, when EndOfPartition is send, the `StreamTask` has finished its lifecycle, and the `SubtaskCheckpointCoordinator` is also closed in `StreamTask#cleanupInvoke()`. This makes us could not snapshot the result partition since `ChannelStateWriter` is closed. 2. The issue for aligned checkpoint still exists: if both A1 and A2 received barrier after EndOfPartition, the barriers might not overtake EndOfPartition and B will have no idea about this checkpoint. 3. The aligned with timeout checkpoint is not still supported, since in this case the barrier is not prioritized and cannot go past EndOfPartition. 4. One potential issue is that we send some events after EndOfPartition might break some assumptions for the network stack. We would need to add a check along the sub-partition and input channel to deal with the case that the barrier arrives after the channel is already closed due to EndOfPartition. There are also some implementation related issues, these issues should be able to be solved, but might introduce some complexity: 1. We would need to introduce a new event for the downstream task to notify the upstream task to exit. We could not reuse CloseRequest since it is connection level and there might be multiple tasks share the same connection. 2. Besides, the upstream task need to do a reference count about how much exit events are received from the downstream tasks. 3. Since the EndOfPartition event is send in `Task`, we would also need some method to avoid affect `BatchTask`. We might need to add some mark interface to AbstractInvokable. 4. We would still need to do some modification to the downstream tasks: support A1 is running and A2 has finished, then when B received unaligned checkpoints from A1, it might decide to trigger the checkpoint and snapshot the buffers for A2. Thus `RemoteInputChannel` would still need to modify the code the consider EndOfPartition in channel persistence logic. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
