[
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhilong Hong updated FLINK-23005:
---------------------------------
Summary: Cache the compressed serialized value of ShuffleDescriptors (was:
Optimize the deployment of tasks)
> Cache the compressed serialized value of ShuffleDescriptors
> -----------------------------------------------------------
>
> Key: FLINK-23005
> URL: https://issues.apache.org/jira/browse/FLINK-23005
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Zhilong Hong
> Assignee: Zhilong Hong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the
> performance of job initialization, failover and partitions releasing.
> However, the task deployment is still slow. For a job with two vertices, each
> vertex has 8k parallelism and they are connected with the all-to-all edge. It
> takes 32.611s to deploy all the tasks and make them transition to running. If
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of
> jobmanager, it means that the jobmanager cannot deal with other akka messages
> like heartbeats, task status update, and etc., for more than two minutes.
>
> All in all, currently there are two issues in the deployment of tasks for
> large scale jobs:
> # It takes a long time to deploy tasks, especially for all-to-all edges.
> # Heartbeat timeout may happen during or after the procedure of task
> deployments. For the streaming job, it would cause the failover of the entire
> region. The job may never transition to running since there would be another
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
> # Jobmanager creates TaskDeploymentDescriptor for each task in the main
> thread
> # TaskDeploymentDescriptor is serialized in the future executor
> # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
> # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the
> IntermediateResultPartitions that a task consumes. For the downstream
> vertices connected with the all-to-all edge that has _N_ parallelism, we need
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these
> vertices, they share the same ShuffleDescriptors since they all consume the
> same IntermediateResultPartitions. We don't need to calculate
> ShuffleDescriptors for each downstream vertex individually. We can just cache
> them. This will decrease the overall complexity of calculating
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_
> times, so we can just cache the serialized value of ShuffleDescriptors
> instead of the original object. To decrease the size of akka messages and
> reduce the transmission of replicated data over the network, these serialized
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their
> serialized value is more than 700 Kilobytes. After the compression, it would
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are
> distributed via the blob server if their sizes exceed a certain threshold
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the
> information from the blob server once they begin to process the
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep
> all the copies in the heap memory until the TaskDeploymentDescriptors are all
> sent. There will be only one copy on the blob server. Like the
> JobInformation, we can just distribute the cached ShuffleDescriptors via the
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for
> the TaskDeploymentDescriptor. We cache the compressed serialized value of
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance
> of our optimization. We choose the streaming job in the experiment because no
> task will be running until all tasks are deployed. This avoids other
> disturbing factors. The job contains two vertices: a source and a sink. They
> are connected with an all-to-all edge.
> The results illustrated below are the time interval between the timestamp of
> the first task that transitions to _deploying_ and the timestamp of the last
> task that transitions to _running_:
> ||Parallelism||Before||After ||
> |8000*8000|32.611s|6.480s|
> |16000*16000|128.408s|19.051s|
--
This message was sent by Atlassian Jira
(v8.3.4#803005)