[ 
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-23005:
-------------------------------

    Assignee: Zhilong Hong

> Optimize the deployment of tasks
> --------------------------------
>
>                 Key: FLINK-23005
>                 URL: https://issues.apache.org/jira/browse/FLINK-23005
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Zhilong Hong
>            Assignee: Zhilong Hong
>            Priority: Major
>             Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our optimization. We choose the streaming job in the experiment because no 
> task will be running until all tasks are deployed. This avoids other 
> disturbing factors. The job contains two vertices: a source and a sink. They 
> are connected with an all-to-all edge.
> The results illustrated below are the time interval between the timestamp of 
> the first task that transitions to _deploying_ and the timestamp of the last 
> task that transitions to _running_:
> ||Parallelism||Before||After ||
> |8000*8000|32.611s|6.480s|
> |16000*16000|128.408s|19.051s|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to