vinoyang created FLINK-10966:
--------------------------------

             Summary: Optimize the release blocking logic in BarrierBuffer
                 Key: FLINK-10966
                 URL: https://issues.apache.org/jira/browse/FLINK-10966
             Project: Flink
          Issue Type: Improvement
          Components: State Backends, Checkpointing
            Reporter: vinoyang
            Assignee: vinoyang


Issue:

Currently, mixing CancelCheckpointMarker control events with data flow to drive 
task to release blocking logic in BarrierBuffer may result in blocking logic 
not being released in time, further leading to a large amount of data being 
spilled to disk.


The source code for this problem is as follows:
{code:java}
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {          //issue line
   // if the channel is blocked we, we just store the BufferOrEvent
   bufferBlocker.add(bufferOrEvent);
   checkSizeLimit();
}
else if (bufferOrEvent.isBuffer()) {
   return bufferOrEvent;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
   if (!endOfStream) {
      // process barriers only if there is a chance of the checkpoint completing
      processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), 
bufferOrEvent.getChannelIndex());
   }
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
   processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent());
}
else {
   if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
      processEndOfPartition();
   }
   return bufferOrEvent;
}

{code}

Scenarios:
Considering a simple DAG:source->map (network shuffle), the degree of 
parallelism is 10. The checkpoint semantics is exactly once.


The first checkpoint: barriers of 9 source subtask are received by all map 
subtask. One of the source subtasks is blocked, resulting in the failure to 
send barrier. Eventually, the checkpoint will fail due to timeout. At this 
point, 9 corresponding input channel are blocked because they have received 
barrier.


Second checkpoint: At this point, the special source subtask is still blocked 
and cannot send any events to downstream, while the nine input channels are 
still blocked. From the current implementation, the data or events it receives 
will not be processed, but will be stored directly. Therefore, the barrier of 
the downstream task will not be released. The only hope is that the cached data 
reaches the maximum limit.


I think the main problem here is that we should not store data which comes from 
blocked input channels directly. Especially when one input channel is blocked 
by upstream and nine input channels are marked as blocked, we may not always be 
able to release the blocking.


A better mechanism might be that we send notifyCheckpointFailed callback via 
CheckpointCoordinator, allowing each task to unblock itself. This mechanism can 
make the release of the old checkpoint align independent of the trigger of the 
new checkpoint. If the interval of the checkpoints are very long but the 
timeout is very short, then the effect of the optimization will be more obvious.


Ultimately, we want to reduce unnecessary blocking and data spill to disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to