[
https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhijiang updated FLINK-10966:
-----------------------------
Comment: was deleted
(was: [~StephanEwen], i am interested in the actor-style stream task you
mentioned. Are there any jiras for this proposal already? I would watch it and
willing to give a hand if needed. :))
> Optimize the release blocking logic in BarrierBuffer
> ----------------------------------------------------
>
> Key: FLINK-10966
> URL: https://issues.apache.org/jira/browse/FLINK-10966
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Reporter: vinoyang
> Assignee: vinoyang
> Priority: Major
>
> Issue:
> Currently, mixing CancelCheckpointMarker control events with data flow to
> drive task to release blocking logic in BarrierBuffer may result in blocking
> logic not being released in time, further leading to a large amount of data
> being spilled to disk.
> The source code for this problem is as follows:
> {code:java}
> BufferOrEvent bufferOrEvent = next.get();
> if (isBlocked(bufferOrEvent.getChannelIndex())) { //issue line
> // if the channel is blocked we, we just store the BufferOrEvent
> bufferBlocker.add(bufferOrEvent);
> checkSizeLimit();
> }
> else if (bufferOrEvent.isBuffer()) {
> return bufferOrEvent;
> }
> else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
> if (!endOfStream) {
> // process barriers only if there is a chance of the checkpoint
> completing
> processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(),
> bufferOrEvent.getChannelIndex());
> }
> }
> else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class)
> {
> processCancellationBarrier((CancelCheckpointMarker)
> bufferOrEvent.getEvent());
> }
> else {
> if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
> processEndOfPartition();
> }
> return bufferOrEvent;
> }
> {code}
> Scenarios:
> Considering a simple DAG:source->map (network shuffle), the degree of
> parallelism is 10. The checkpoint semantics is exactly once.
> The first checkpoint: barriers of 9 source subtask are received by all map
> subtask. One of the source subtasks is blocked, resulting in the failure to
> send barrier. Eventually, the checkpoint will fail due to timeout. At this
> point, 9 corresponding input channel are blocked because they have received
> barrier.
> Second checkpoint: At this point, the special source subtask is still blocked
> and cannot send any events to downstream, while the nine input channels are
> still blocked. From the current implementation, the data or events it
> receives will not be processed, but will be stored directly. Therefore, the
> barrier of the downstream task will not be released. The only hope is that
> the cached data reaches the maximum limit.
> I think the main problem here is that we should not store data which comes
> from blocked input channels directly. Especially when one input channel is
> blocked by upstream and nine input channels are marked as blocked, we may not
> always be able to release the blocking.
> A better mechanism might be that we send notifyCheckpointFailed callback via
> CheckpointCoordinator, allowing each task to unblock itself. This mechanism
> can make the release of the old checkpoint align independent of the trigger
> of the new checkpoint. If the interval of the checkpoints are very long but
> the timeout is very short, then the effect of the optimization will be more
> obvious.
> Ultimately, we want to reduce unnecessary blocking and data spill to disk.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)