[
https://issues.apache.org/jira/browse/FLINK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu closed FLINK-21110.
---------------------------
Resolution: Done
> Optimize scheduler performance for large-scale jobs
> ---------------------------------------------------
>
> Key: FLINK-21110
> URL: https://issues.apache.org/jira/browse/FLINK-21110
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Reporter: Zhilong Hong
> Assignee: Zhilong Hong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: Illustration of Group.jpg
>
>
> According to the result of scheduler benchmarks we implemented in
> FLINK-20612, the bottleneck of deploying and running a large-scale job in
> Flink is mainly focused on the following procedures:
> |Procedure|Time complexity|
> |Initializing ExecutionGraph|O(N^2)|
> |Building DefaultExecutionTopology|O(N^2)|
> |Initializing PipelinedRegionSchedulingStrategy|O(N^2)|
> |Scheduling downstream tasks when a task finishes|O(N^2)|
> |Calculating tasks to restart when a failover occurs|O(N^2)|
> |Releasing result partitions|O(N^3)|
> These procedures are all related to the complexity of the topology in the
> ExecutionGraph. Between two vertices connected with the all-to-all edges, all
> the upstream Intermediate ResultPartitions are connected to all downstream
> ExecutionVertices. The computation complexity of building and traversing all
> these edges will be O(N^2).
> As for memory usage, currently we use ExecutionEdges to store the information
> of connections. For the all-to-all distribution type, there are O(N^2)
> ExecutionEdges. We test a simple job with only two vertices. The parallelisms
> of them are both 10k. Furthermore, they are connected with all-to-all edges.
> It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.
> In most large-scale jobs, there will be more than two vertices with large
> parallelisms, and they would cost a lot of time and memory to deploy the job.
> As we can see, for two JobVertices connected with the all-to-all distribution
> type, all IntermediateResultPartitions produced by the upstream
> ExecutionVertices are isomorphic, which means that the downstream
> ExecutionVertices they connected are exactly the same. The downstream
> ExecutionVertices belonging to the same JobVertex are also isomorphic, as the
> upstream ResultPartitions they connect are the same, too.
> Since every JobEdge has exactly one distribution type, we can divide the
> vertices and result partitions into groups according to the distribution type
> of the JobEdge.
> For the all-to-all distribution type, since all downstream vertices are
> isomorphic, they belong to a single group, and all the upstream result
> partitions are connected to this group. Vice versa, all the upstream result
> partitions also belong to a single group, and all the downstream vertices are
> connected to this group. In the past, when we wanted to iterate all the
> downstream vertices, we needed to loop over them n times, which leads to the
> complexity of O(N^2). Now since all upstream result partitions are connected
> to one downstream group, we just need to loop over them once, with the
> complexity of O(N).
> For the pointwise distribution type, because each result partition is
> connected to different downstream vertices, they should belong to different
> groups. Vice versa, all the vertices belong to different groups. Since one
> result partition group is connected to one vertex group pointwisely, the
> computation complexity of looping over them is still O(N).
> !Illustration of Group.jpg|height=249!
> After we group the result partitions and vertices, ExecutionEdge is no longer
> needed. For the test job we mentioned above, the optimization can effectively
> reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean)
> in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds
> (with 10k parallelism).
>
> The detailed design doc:
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)