[
https://issues.apache.org/jira/browse/FLINK-14909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gary Yao closed FLINK-14909.
----------------------------
Resolution: Fixed
master:
24b77dbdfe072ab6087284b34f95e44b50e2610f
33d81ccb69ac82ef030c54cb12235dec540554dd
7902947cadda4b730d816f70f9ecb1e55215f373
> Let tasks in a batch get scheduled in topological order and subtaskIndex
> ascending pattern
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-14909
> URL: https://issues.apache.org/jira/browse/FLINK-14909
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Assignee: Zhu Zhu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.10.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> I'd propose to let tasks in a batch get scheduled in topological order and
> subtaskIndex ascending pattern.
> There can be 2 benefits:
> 1. there would be less chance for a task to get launched before its upstream
> tasks, which reduces {{requestPartitionState}} RPCs to JobMaster.
> 2. logs could be more readable, e.g.
> ordered:
> Source: source (1/20) ... switched from CREATED to SCHEDULED.
> Source: source (2/20) ... switched from CREATED to SCHEDULED.
> ...
> Source: source (20/20) ... switched from CREATED to SCHEDULED.
> Flat Map (1/20) ... switched from CREATED to SCHEDULED.
> ...
> Flat Map (20/20) ... switched from CREATED to SCHEDULED.
> disordered:
> Source: source (1/20) ... switched from CREATED to SCHEDULED.
> Flat Map (11/20) ... switched from CREATED to SCHEDULED.
> Source: source (19/20) ... switched from CREATED to SCHEDULED.
> Flat Map (2/20) ... switched from CREATED to SCHEDULED.
> ...
> The detailed proposal is:
> 1. change scheduling related methods to take and return tasks as {{List}}
> instead of {{Collection}} in {{DefaultScheduler}} and related classes
> 2. sort the tasks received in {{DefaultScheduler#allocateSlotsAndDeploy}} to
> be topological sorted (primary) and subtaskIndex ascending (secondary) order
> before scheduling them. The tasks scheduled by {{EagerSchedulingStrategy}}
> can be in order with this change.
> 3. Change {{LazyFromSourcesSchedulingStrategy}} to schedule tasks in the
> original order it receives the tasks, which is usually in the desired order.
> We do this because in FLINK-14162 we may invoke #allocateSlotsAndDeploy on
> each vertex individually in this scheduling strategy, so that the ordering in
> {{DefaultScheduler}} would not work.
> Note that it's just best effort since we always receives a Set of tasks in
> #restartTasks. But it should be Ok since the disordering does not result in
> more {{requestPartitionState}} RPCs with this scheduling strategy, and batch
> jobs are usually in small regions so that the log disordering is not that
> obvious.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)