[
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann reassigned FLINK-15031:
-------------------------------------
Assignee: Zhu Zhu
> Calculate required shuffle memory cases before allocating slots in resources
> specified
> --------------------------------------------------------------------------------------
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Assignee: Zhu Zhu
> Priority: Major
> Fix For: 1.10.0
>
>
> In resources specified cases, we expect each operator to declare required
> resources and before using them. In this way, no resource related error
> should happen if no resource is used more than declared. This ensures a
> deployed task would not fail due to insufficient resources in TM. This may
> result in unnecessary failures and may even cause a job hanging forever,
> failing repeatedly on deploying tasks to a TM with insufficient resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum
> network buffers are required by tasks to work. Currently a task is possible
> to be deployed to a TM with insufficient network buffers, and fails on
> launching.
> To avoid that, we should calculate required network memory for a
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required
> network buffers. The number of buffers required by a task (ExecutionVertex)
> is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel *
> buffersPerChannel) + required buffers for result partition buffer
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle
> services, currently there is no way to get the required shuffle memory of a
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for
> a task should be the max required shuffle memory of all {{ExecutionVertex}}
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a
> slot sharing group should be the sum of shuffle memory for each
> {{ExecutionJobVertex}} instance within.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)