gaoyunhaii commented on pull request #14831:
URL: https://github.com/apache/flink/pull/14831#issuecomment-774166223


   Hi @pnowojski , very thanks for the review and deep thoughts! I first tried 
to have a consideration for the general concerns.
   
   For the first issue, previously Roman and I also have some discussion, we 
think that we might solve this issue via making the upstream tasks to wait till 
all the buffers are flushed out of the sub-partitions before get to finished. 
Then during this period, the upstream task would be still running and we could 
then do unaligned checkpoint normally.
   
   For holding the upstream tasks till all the remaining data is consumed, I 
also think it is a good direction that might simplify the issue. But I still 
found it might not easy to implement in some detailed points. For convenience 
suppose we have one task B with two upstream tasks A1 and A2.
   
   The first concern is about the upstream tasks ignores the barriers to the 
downstream tasks after `EndOfPartition` event in the aligned checkpoint case:
   
   1. If both A1 and A2 received barrier after EndOfPartition, then task B 
would only receive EndOfPartitions and would not know any thing about this 
checkpoint. JM also could not detect this situation since all the checkpoint 
RPC triggers are success. This would cause checkpoint timeout and then cause 
job failover. 
   
   
   The second concern is about letting the barriers to "chase after" 
EndOfPartition for unaligned checkpoint case:
   
   1. Since EndOfPartition is sent in `Task` instead of `StreamTask`, when 
EndOfPartition is send, the `StreamTask` has finished its lifecycle, and the 
`SubtaskCheckpointCoordinator` is also closed in `StreamTask#cleanupInvoke()`. 
This makes us could not snapshot the result partition since 
`ChannelStateWriter` is closed.
   2. The issue for aligned checkpoint still exists: if both A1 and A2 received 
barrier after EndOfPartition, the barriers might not overtake EndOfPartition 
and B will have no idea about this checkpoint.
   3. The aligned with timeout checkpoint is not still supported, since in this 
case the barrier is not prioritized and cannot go past EndOfPartition.
   4. One potential issue is that we send some events after EndOfPartition 
might break some assumptions for the network stack. We would need to add a 
check along the sub-partition and input channel to deal with the case that the 
barrier arrives after the channel is already closed due to EndOfPartition. 
   
   
   There are also some implementation related issues, these issues should be 
able to be solved, but might introduce some complexity:
   
   1. We would need to introduce a new event for the downstream task to notify 
the upstream task to exit. We could not reuse CloseRequest since it is 
connection level and there might be multiple tasks share the same connection. 
   2. Besides, the upstream task need to do a reference count about how much 
exit events are received from the downstream tasks. 
   3. Since the EndOfPartition event is send in `Task`, we would also need some 
method to avoid affect `BatchTask`. We might need to add some mark interface to 
AbstractInvokable.
   4. We would still need to do some modification to the downstream tasks: 
support A1 is running and A2 has finished, then when B received unaligned 
checkpoints from A1, it might decide to trigger the checkpoint and snapshot the 
buffers for A2. Thus `RemoteInputChannel` would still need to modify the code 
the consider EndOfPartition in channel persistence logic.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to