[
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385644#comment-17385644
]
Zhu Zhu commented on FLINK-23218:
---------------------------------
Thanks for confirming! [~trohrmann]
And thanks for the explanation for the transient blob option. I think you are
right that we can try re-offload {{JobInformation}}, {{TaskInformation}} and
{{ShuffleDescriptors}} before deploying a task. It may need some extra efforts
though to track and de-duplicate blobs on BlobServer. So in the first step we
will try introducing a {{read()}} API in {{PermanentBlobService}} which might
be simpler.
> Distribute the ShuffleDescriptors via blob server
> -------------------------------------------------
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Zhilong Hong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their
> serialized value is more than 700 Kilobytes. After the compression, it would
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are
> distributed via the blob server if their sizes exceed a certain threshold
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the
> information from the blob server once they begin to process the
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep
> all the copies in the heap memory until the TaskDeploymentDescriptors are all
> sent. There will be only one copy in the blob server. Like the
> JobInformation, we can just distribute the cached ShuffleDescriptors via the
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the
> partitions related to them are no longer valid. This makes sure the blob
> server won't be full of cached ShuffleDescriptors, even there's a long
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen
> on the TaskExecutor because of cached ShuffleDescriptors. For more details
> please see FLINK-23354.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)