[
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu updated FLINK-15031:
----------------------------
Summary: Automatically calculate required network memory for fine-grained
jobs (was: Automatically calculate required shuffle memory for fine-grained
jobs)
> Automatically calculate required network memory for fine-grained jobs
> ---------------------------------------------------------------------
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Assignee: Jin Xing
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare
> required resources before using them. In this way, no resource related error
> should happen if resources are not used beyond what was declared. This
> ensures a deployed task would not fail due to insufficient resources in TM,
> which may result in unnecessary failures and may even cause a job hanging
> forever, failing repeatedly on deploying tasks to a TM with insufficient
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum
> network buffers are required by tasks to work. Currently a task is possible
> to be deployed to a TM with insufficient network buffers, and fails on
> launching.
> To avoid that, we should calculate required network memory for a
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel *
> buffersPerChannel) + required buffers for result partition buffer
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle
> services, currently there is no way to get the required shuffle memory of a
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for
> a task should be the max required shuffle memory of all {{ExecutionVertex}}
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a
> slot sharing group should be the sum of shuffle memory for each
> {{ExecutionJobVertex}} instance within.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)