[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reopened FLINK-15031:
-----------------------------
      Assignee: Jin Xing  (was: Zhu Zhu)

> Automatically calculate required shuffle memory for fine-grained jobs
> ---------------------------------------------------------------------
>
>                 Key: FLINK-15031
>                 URL: https://issues.apache.org/jira/browse/FLINK-15031
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Assignee: Jin Xing
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to