[
https://issues.apache.org/jira/browse/FLINK-23005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386867#comment-17386867
]
Zhilong Hong edited comment on FLINK-23005 at 8/5/21, 9:07 AM:
---------------------------------------------------------------
We compared several ways of compression implemented in Java standard library
and [Apache Common Library|https://commons.apache.org/proper/commons-compress]
(located in the package {{org.apache.commons.compress.compressors}}). The
comparison is conducted in a unit test that compresses 8,000 shuffle
descriptors for 1,000 iteration. Here's the result:
|| ||Memory||Time (1,000x)||
|No compression|745.768 KiB|27.049s|
|Deflater|169.127 KiB|143.685s|
|GZIP (Java)|169.094 KiB|143.798s|
|GZIP (Apache)|169.069 KiB|150.080s|
|BZIP2|160.589 KiB|185.515s|
|Snappy|191.511 KiB|44.104 s|
|LZ4|190.407 KiB|1336.252 s|
Other compression methods like LZMA, Pack2000, XZ, ZStandard take longer than 1
minute to run an iteration. Therefore, we are not comparing them.
was (Author: thesharing):
We compared several ways of compression implemented in Java standard library
and [Apache Common
Library|https://commons.apache.org/proper/commons-compress](located at the
package {{org.apache.commons.compress.compressors}}). The comparison is a unit
test that compress 8k shuffle descriptors for 1k iteration. Here's the result:
|| ||Memory||Time (1000x)||
|No compression|745.768 KiB|27.049s|
|Deflater|169.127 KiB|143.685s|
|GZIP (Java)|169.094 KiB|143.798s|
|GZIP (Apache)|169.069 KiB|150.080s|
|BZIP2|160.589 KiB|185.515s|
|Snappy|191.511 KiB|44.104 s|
|LZ4|190.407 KiB|1336.252 s|
Other compression methods like LZMA、Pack2000、XZ、ZStandard take longer than
1min to run a iteration, thus we are not comparing them.
> Cache the compressed serialized value of ShuffleDescriptors
> -----------------------------------------------------------
>
> Key: FLINK-23005
> URL: https://issues.apache.org/jira/browse/FLINK-23005
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Zhilong Hong
> Assignee: Zhilong Hong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> h3. Introduction
> The optimizations introduced in FLINK-21110 so far have improved the
> performance of job initialization, failover and partitions releasing.
> However, the task deployment is still slow. For a job with two vertices, each
> vertex has 8k parallelism and they are connected with the all-to-all edge. It
> takes 32.611s to deploy all the tasks and make them transition to running. If
> the parallelisms are 16k, it may take more than 2 minutes.
> As the creation of TaskDeploymentDescriptors runs in the main thread of
> jobmanager, it means that the jobmanager cannot deal with other akka messages
> like heartbeats, task status update, and etc., for more than two minutes.
>
> All in all, currently there are two issues in the deployment of tasks for
> large scale jobs:
> # It takes a long time to deploy tasks, especially for all-to-all edges.
> # Heartbeat timeout may happen during or after the procedure of task
> deployments. For the streaming job, it would cause the failover of the entire
> region. The job may never transition to running since there would be another
> heartbeat timeout during the procedure of new task deployments.
> h3. Proposal
> Task deployment involves the following procedures:
> # Jobmanager creates TaskDeploymentDescriptor for each task in the main
> thread
> # TaskDeploymentDescriptor is serialized in the future executor
> # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
> # TaskExecutors create a new task thread and execute it
>
> The optimization contains three parts:
> 1. Cache the compressed serialized value of ShuffleDescriptors
> ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the
> IntermediateResultPartitions that a task consumes. For the downstream
> vertices connected with the all-to-all edge that has _N_ parallelism, we need
> to calculate _N_ ShuffleDescriptors for _N_ times. However, for these
> vertices, they share the same ShuffleDescriptors since they all consume the
> same IntermediateResultPartitions. We don't need to calculate
> ShuffleDescriptors for each downstream vertex individually. We can just cache
> them. This will decrease the overall complexity of calculating
> TaskDeploymentDescriptors from O(N^2) to O(N).
> Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_
> times, so we can just cache the serialized value of ShuffleDescriptors
> instead of the original object. To decrease the size of akka messages and
> reduce the transmission of replicated data over the network, these serialized
> value can be compressed.
> 2. Distribute the ShuffleDescriptors via blob server (see FLINK-23218)
> 3. Limit the size of ShuffleDescriptors in PermanentBlobCache on TaskExecutor
> (see FLINK-23354)
> h3. Summary
> In summary, the optimization of task deployment is to introduce a cache for
> the TaskDeploymentDescriptor. We cache the compressed serialized value of
> ShuffleDescriptors. If the size of the value exceeds a certain threshold, the
> value would be distributed via the blob server.
> h3. Comparison
> We implemented a POC and conducted an experiment to compare the performance
> of our optimization. We choose the streaming job in the experiment because no
> task will be running until all tasks are deployed. This avoids other
> disturbing factors. The job contains two vertices: a source and a sink. They
> are connected with an all-to-all edge.
> The results illustrated below are the time interval between the timestamp of
> the first task that transitions to _deploying_ and the timestamp of the last
> task that transitions to _running_:
> ||Parallelism||Before||After ||
> |8000*8000|32.611s|6.480s|
> |16000*16000|128.408s|19.051s|
--
This message was sent by Atlassian Jira
(v8.3.4#803005)