[
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352374#comment-17352374
]
Xintong Song commented on FLINK-15031:
--------------------------------------
Thanks for reviving this discussion, [[email protected]].
+1 on moving this ticket forward. We are planning to expose user APIs
(DataStream) for fine-grained resource management in the 1.14 release cycle. It
would be helpful that users don't need to understand the shuffle service
internal details and manually configure the network memory requirements.
The philosophy behind fine-grained resource management is to make resource
requirements of every slot explicit. It is very important that tasks in the
slot never need/use more resources than the declared requirement. To that end,
announcing the requirements with the floating buffers covered sounds like the
right approach to me.
As for efficiency, AFAIK the amount of floating buffers is configurable. That
means users can flexibly decide how many extra buffers, in addition to the
minimum requirement, to reserve. That sounds good enough to me.
> Calculate required shuffle memory before allocating slots if resources are
> specified
> ------------------------------------------------------------------------------------
>
> Key: FLINK-15031
> URL: https://issues.apache.org/jira/browse/FLINK-15031
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Assignee: Zhu Zhu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare
> required resources before using them. In this way, no resource related error
> should happen if resources are not used beyond what was declared. This
> ensures a deployed task would not fail due to insufficient resources in TM,
> which may result in unnecessary failures and may even cause a job hanging
> forever, failing repeatedly on deploying tasks to a TM with insufficient
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum
> network buffers are required by tasks to work. Currently a task is possible
> to be deployed to a TM with insufficient network buffers, and fails on
> launching.
> To avoid that, we should calculate required network memory for a
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel *
> buffersPerChannel) + required buffers for result partition buffer
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle
> services, currently there is no way to get the required shuffle memory of a
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for
> a task should be the max required shuffle memory of all {{ExecutionVertex}}
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a
> slot sharing group should be the sum of shuffle memory for each
> {{ExecutionJobVertex}} instance within.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)