[ 
https://issues.apache.org/jira/browse/FLINK-19142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295945#comment-17295945
 ] 

Zhu Zhu commented on FLINK-19142:
---------------------------------

In my understanding, the problem is that {{MergingSharedSlotProfileRetriever}} 
does not properly set the {{previousExecutionGraphAllocations}} of a 
{{SlotProfile}}. So that some restarted region may take the previous slots of 
other failed regions, and the state local recovery will be affected.

To fix the problem, we need {{SlotProfile.previousExecutionGraphAllocations}} 
to include previous allocationId of all the vertices which need to be restarted 
at that time.

The suggestion from [~azagrebin] to "give to MergingSharedSlotProfileRetriever 
all previous AllocationIDs of bulks which are going to run at the same time." 
is theoretically correct. However, it is not easy for the scheduler to identify 
which bulks will run at the same time. A simpler solution I can think of is to 
find all the vertices which are *not* DEPLOYING/RUNNING/FINISHED at that 
moment,  which indicates they may still want their previous slots,  and set 
their prior allocation IDs to {{MergingSharedSlotProfileRetriever}}.

WDYT? [~trohrmann][~chesnay]

> Investigate slot hijacking from preceding pipelined regions after failover
> --------------------------------------------------------------------------
>
>                 Key: FLINK-19142
>                 URL: https://issues.apache.org/jira/browse/FLINK-19142
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.12.0
>            Reporter: Andrey Zagrebin
>            Assignee: Zhu Zhu
>            Priority: Major
>             Fix For: 1.13.0
>
>
> The ticket originates from [this PR 
> discussion|https://github.com/apache/flink/pull/13181#discussion_r481087221].
> The previous AllocationIDs are used by 
> PreviousAllocationSlotSelectionStrategy to schedule subtasks into the slot 
> where they were previously executed before a failover. If the previous slot 
> (AllocationID) is not available, we do not want subtasks to take previous 
> slots (AllocationIDs) of other subtasks.
> The MergingSharedSlotProfileRetriever gets all previous AllocationIDs of the 
> bulk from SlotSharingExecutionSlotAllocator but only from the current bulk. 
> The previous AllocationIDs of other bulks stay unknown. Therefore, the 
> current bulk can potentially hijack the previous slots from the preceding 
> bulks. On the other hand the previous AllocationIDs of other tasks should be 
> taken if the other tasks are not going to run at the same time, e.g. not 
> enough resources after failover or other bulks are done.
> One way to do it may be to give to MergingSharedSlotProfileRetriever all 
> previous AllocationIDs of bulks which are going to run at the same time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to