[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375416#comment-17375416
 ] 

Zhilong Hong commented on FLINK-23218:
--------------------------------------

Thank you for your reply, [~trohrmann]. 
 # As for the compression step, the main concern is the time this procedure 
costs. We've tested in several scenarios.
 ## For the vertices connected with all-to-all edge, when parallelism = 1000, 
the time of serialization and compression for the shuffle descriptors is 21 ms.
 ## When parallelism = 8000, the time is 139 ms. It's worth noting that due to 
the cache, we only need to compress it once.
 ## For pointwise edge, when parallelism = 8000, the time of compression is 
less than 1ms.
 ## As for decompression happened in TaskManager, when parallelism = 8000 and 
the edge is all-to-all, it takes less than 1ms to decompress and deserialize 
the ShuffleDescriptors.
 ## So I think there's no drawback caused by the compression step, for both low 
scale and large scale jobs.
 # However, there's one blocker and we request for your opinion.
 ## Distributing the ShuffleDescriptors via blob server aims to solve the issue 
related to GC.Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, the cache would become a heavy burden 
for the garbage collector to deal with.
 ## When we are trying to distribute the ShuffleDescriptors via blob server, we 
find that it may be a disaster if we don't clean it up. When large amount of 
failovers happen, there will be a lot of cache stored on local disk. In extreme 
cases, the blob would blow up the disk space.
 ## It's worth noting that JobInformation and TaskInformation have the same 
issue, too. For OLAP scenario, if a lot of jobs are submitted, the blob cache 
on TaskManager will stay until one hour after the job is finished.
 ## So we need to remove the blob cache when it's useless. For Jobmanager, we 
can remove it once failover happens or the partition is released. But for 
TaskManager, it would be hard to deal with the blob cache. At present, the blob 
cache is removed in an hour after the job is finished. 
 ## We come up with a proposal: for each TaskManager, its blob cache syncs the 
status of all blobs with the blob server every 5 minutes (it's configurable). 
If it's removed from the blob server, it would be removed from the blob cache, 
too.

We're looking forward to your suggestions and opinions for this blocker issue. 
Thank you so much in advance.

> Distribute the ShuffleDescriptors via blob server
> -------------------------------------------------
>
>                 Key: FLINK-23218
>                 URL: https://issues.apache.org/jira/browse/FLINK-23218
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Zhilong Hong
>            Priority: Major
>             Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the 
> performance of job initialization, failover and partitions releasing. 
> However, the task deployment is still slow. For a job with two vertices, each 
> vertex has 8k parallelism and they are connected with the all-to-all edge. It 
> takes 32.611s to deploy all the tasks and make them transition to running. If 
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of 
> jobmanager, it means that the jobmanager cannot deal with other akka messages 
> like heartbeats, task status update, and etc., for more than two minutes.
>  
> All in all, currently there are two issues in the deployment of tasks for 
> large scale jobs:
>  # It takes a long time to deploy tasks, especially for all-to-all edges.
>  # Heartbeat timeout may happen during or after the procedure of task 
> deployments. For the streaming job, it would cause the failover of the entire 
> region. The job may never transition to running since there would be another 
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
>  # Jobmanager creates TaskDeploymentDescriptor for each task in the main 
> thread
>  # TaskDeploymentDescriptor is serialized in the future executor
>  # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
>  # TaskExecutors create a new task thread and execute it
> The optimization contains two parts:
> *1. Cache the compressed serialized value of ShuffleDescriptors*
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
> IntermediateResultPartitions that a task consumes. For the downstream 
> vertices connected with the all-to-all edge that has _N_ parallelism, we need 
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these 
> vertices, they share the same ShuffleDescriptors since they all consume the 
> same IntermediateResultPartitions. We don't need to calculate 
> ShuffleDescriptors for each downstream vertex individually. We can just cache 
> them. This will decrease the overall complexity of calculating 
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
> times, so we can just cache the serialized value of ShuffleDescriptors 
> instead of the original object. To decrease the size of akka messages and 
> reduce the transmission of replicated data over the network, these serialized 
> value can be compressed.
> *2. Distribute the ShuffleDescriptors via blob server*
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy on the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for 
> the TaskDeploymentDescriptor. We cache the compressed serialized value of 
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance 
> of our optimization. We choose the streaming job in the experiment because no 
> task will be running until all tasks are deployed. This avoids other 
> disturbing factors. The job contains two vertices: a source and a sink. They 
> are connected with an all-to-all edge.
> The results illustrated below are the time interval between the timestamp of 
> the first task that transitions to _deploying_ and the timestamp of the last 
> task that transitions to _running_:
> ||Parallelism||Before||After ||
> |8000*8000|32.611s|6.480s|
> |16000*16000|128.408s|19.051s|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to