[ 
https://issues.apache.org/jira/browse/FLINK-5052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-5052:
----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Changing the maximum parallelism (number of key groups) of a job
> ----------------------------------------------------------------
>
>                 Key: FLINK-5052
>                 URL: https://issues.apache.org/jira/browse/FLINK-5052
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Stefan Richter
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> Through dynamic rescaling, Flink jobs can already adjust their parallelism 
> and each operator only has to read it's assigned key-groups. 
> However, the maximum parallelism is determined by the number of key-groups  
> (aka maxParallelism), which is currently fixed forever after the job is first 
> started. We could consider to relax this limitations, so that users can 
> modify the number of key-groups after the fact, which is useful in particular 
> for upscaling jobs from older Flink versions (<1.2) which must be converted 
> with maxparallelism == parallelism.
> In the general case, changing the maxParallelism can lead to shuffling of 
> keys between key-groups, which means that a change in the number of 
> key-groups can shuffle keys between key-groups and we would have to read the 
> complete state on each operator instance, filtering for those keys that 
> actually fall into the key-groups assigned to the operator instances. While 
> it is certainly possible to support this, it is obviously a very expensive 
> operation.
> Fortunately, the assignment of keys to operators is currently determined as 
> follows:
> {{operatorInstance = computeKeyGroup(key) * parallelism / maxParallelism}}
> This means that we can provide more efficient support for upscaling of 
> maxParallelism, if {{newMaxParallelism == n * oldMaxParallelism}}. In this 
> case, keys are not reshuffled between key-groups, but key-groups are split by 
> a factor n instead. This only focus on some old key-groups when restoring 
> operator instances for new maxParallelism and significantly reduces the 
> amount of unnecessary data transfer, e.g. ~ 1/2 for increasing maxParallelism 
> by a factor 2, ~2/3 when increasing by a factor 3, etc. 
> Implementing this feature would require the following steps:
>       - Introduce/modify state handles with the capability to summarize 
> multiple logical keygroups into one mixed physical entity.
>       - Enhance StateAssignmentOperation so that it can deal with and 
> correctly assign the new/modified keyed state handles to subtasks on 
> restoring a checkpoint. We also need to implement how to compute the correct 
> super-key-group, but this is rather simple.
>       - Filtering out key clippings on restoring in the backends.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to