[
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu updated FLINK-15031:
----------------------------
Description:
In resources specified cases, we expect each operator to declare required
resources and before using them. In this way, no resource related error should
happen if no resource is used more than declared. This ensures a deployed task
would not fail due to insufficient resources in TM. This may result in
unnecessary failures and may even cause a job hanging forever, failing
repeatedly on deploying tasks to a TM with insufficient resources.
Shuffle memory is the last missing piece for this goal at the moment. Minimum
network buffers are required by tasks to work. Currently a task is possible to
be deployed to a TM with insufficient network buffers, and fails on launching.
To avoid that, we should calculate required network memory for a
task/SlotSharingGroup before allocating a slot for it.
The required shuffle memory can be derived from the number of required network
buffers. The number of buffers required by a task (ExecutionVertex) is
{code:java}
exclusive buffers for input channels(i.e. numInputChannel * buffersPerChannel)
+ required buffers for result partition buffer pool(currently is
numberOfSubpartitions + 1)
{code}
Note that this is the {{NettyShuffleService}} case. For custom shuffle
services, currently there is no way to get the required shuffle memory of a
task.
To make it simple under dynamic slot sharing, the required shuffle memory for a
task should be the max required shuffle memory of all {{ExecutionVertex}} of
the same {{ExecutionJobVertex}}. And the required shuffle memory for a slot
sharing group should be the sum of shuffle memory for each
{{ExecutionJobVertex}} instance within.
was:
In resources specified cases, we expect each operator to declare required
resources and before using them. In this way, no resource related error should
happen if no resource is used more than declared. This ensures a deployed task
would not fail due to insufficient resources in TM. This may result in
unnecessary failures and may even cause a job hanging forever, failing
repeatedly on deploying tasks to a TM with insufficient resources.
Shuffle memory is the last missing piece for this goal at the moment. Minimum
network buffers are required by tasks to work. Currently a task is possible to
be deployed to a TM with insufficient network buffers, and fails on launching.
To avoid that, we should calculate required network memory for a
task/SlotSharingGroup and set the result in {{ResourceProfile}} before
allocating a slot for it.
> Calculate required shuffle memory cases before allocating slots in resources
> specified
> --------------------------------------------------------------------------------------
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Priority: Major
> Fix For: 1.10.0
>
>
> In resources specified cases, we expect each operator to declare required
> resources and before using them. In this way, no resource related error
> should happen if no resource is used more than declared. This ensures a
> deployed task would not fail due to insufficient resources in TM. This may
> result in unnecessary failures and may even cause a job hanging forever,
> failing repeatedly on deploying tasks to a TM with insufficient resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum
> network buffers are required by tasks to work. Currently a task is possible
> to be deployed to a TM with insufficient network buffers, and fails on
> launching.
> To avoid that, we should calculate required network memory for a
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required
> network buffers. The number of buffers required by a task (ExecutionVertex)
> is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel *
> buffersPerChannel) + required buffers for result partition buffer
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is the {{NettyShuffleService}} case. For custom shuffle
> services, currently there is no way to get the required shuffle memory of a
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for
> a task should be the max required shuffle memory of all {{ExecutionVertex}}
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a
> slot sharing group should be the sum of shuffle memory for each
> {{ExecutionJobVertex}} instance within.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)