[ 
https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700591#comment-16700591
 ] 

Stephan Ewen edited comment on FLINK-10966 at 11/27/18 3:32 PM:
----------------------------------------------------------------

I need some clarification to help understand this:

 - In what situation would 9 sources have sent the barrier and one source would 
not have sent the barrier?
 - Why can that source never send the barrier in your scenario?

There are some mechanisms in place to recover things, like
 - The block for checkpoint n will be released if a barrier for n+1 comes on 
any unblocked input 
 - Sources that are not yet ready send a "cancel barrier" instead of a 
checkpoint barrier.
 - Data is no longer spilled to disk since Flink 1.5 with network flow control

The fact that the checkpoint coordinator does not send a "cancel message" when 
a checkpoint is declined is something that should be improved, but this is 
different than adjusting the barrier buffer logic.

Can you explain what changes in the logic could help here, without changing the 
semantics?



was (Author: stephanewen):
I need some clarification to help understand this:

 - In what situation would 9 sources have sent the barrier and one source would 
not have sent the barrier?
 - Why can that source never send the barrier in your scenario?

There are some mechanisms in place to recover things, like
 - The block for checkpoint n will be released if a barrier for n+1 comes on 
any unblocked input 
 - Sources that are not yet ready send a "cancel barrier" instead of a 
checkpoint barrier.
 - Data is no longer spilled to disk since Flink 1.5 with network flow control

The fact that the checkpoint coordinator does not send a "cancel message" when 
a checkpoint is declined is something that should be improved, but this is 
different than adjusting the barrier buffer logic.

Can you explain what changes in the logic could help here, without changing the 
behavior.


> Optimize the release blocking logic in BarrierBuffer
> ----------------------------------------------------
>
>                 Key: FLINK-10966
>                 URL: https://issues.apache.org/jira/browse/FLINK-10966
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> Issue:
> Currently, mixing CancelCheckpointMarker control events with data flow to 
> drive task to release blocking logic in BarrierBuffer may result in blocking 
> logic not being released in time, further leading to a large amount of data 
> being spilled to disk.
> The source code for this problem is as follows:
> {code:java}
> BufferOrEvent bufferOrEvent = next.get();
> if (isBlocked(bufferOrEvent.getChannelIndex())) {          //issue line
>    // if the channel is blocked we, we just store the BufferOrEvent
>    bufferBlocker.add(bufferOrEvent);
>    checkSizeLimit();
> }
> else if (bufferOrEvent.isBuffer()) {
>    return bufferOrEvent;
> }
> else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
>    if (!endOfStream) {
>       // process barriers only if there is a chance of the checkpoint 
> completing
>       processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), 
> bufferOrEvent.getChannelIndex());
>    }
> }
> else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) 
> {
>    processCancellationBarrier((CancelCheckpointMarker) 
> bufferOrEvent.getEvent());
> }
> else {
>    if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
>       processEndOfPartition();
>    }
>    return bufferOrEvent;
> }
> {code}
> Scenarios:
> Considering a simple DAG:source->map (network shuffle), the degree of 
> parallelism is 10. The checkpoint semantics is exactly once.
> The first checkpoint: barriers of 9 source subtask are received by all map 
> subtask. One of the source subtasks is blocked, resulting in the failure to 
> send barrier. Eventually, the checkpoint will fail due to timeout. At this 
> point, 9 corresponding input channel are blocked because they have received 
> barrier.
> Second checkpoint: At this point, the special source subtask is still blocked 
> and cannot send any events to downstream, while the nine input channels are 
> still blocked. From the current implementation, the data or events it 
> receives will not be processed, but will be stored directly. Therefore, the 
> barrier of the downstream task will not be released. The only hope is that 
> the cached data reaches the maximum limit.
> I think the main problem here is that we should not store data which comes 
> from blocked input channels directly. Especially when one input channel is 
> blocked by upstream and nine input channels are marked as blocked, we may not 
> always be able to release the blocking.
> A better mechanism might be that we send notifyCheckpointFailed callback via 
> CheckpointCoordinator, allowing each task to unblock itself. This mechanism 
> can make the release of the old checkpoint align independent of the trigger 
> of the new checkpoint. If the interval of the checkpoints are very long but 
> the timeout is very short, then the effect of the optimization will be more 
> obvious.
> Ultimately, we want to reduce unnecessary blocking and data spill to disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to