[ 
https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767397#comment-17767397
 ] 

Zhanghao Chen commented on FLINK-33123:
---------------------------------------

Hi [~fanrui], thanks for sharing the issues. For issue 1: directly updating the 
execution graph should also work. For issue 2: this is indeed an issue and I 
don't know if there is a good solution to it if we would use REBALANCE. But I 
think it would be better to use RESCALE with some help from the auotscaling 
algorithm side. We choose to change it from FORWARD to RESCALE instead 
internally for the following reasons:
 * REBALANCE takes quite a few network memory which can lead to memory issue 
after rescaling, esp. if the parallelism is big. It may also introduce 
performance degrading due to the extra shuffle.
 * FORWARD and RESCALE are actually interchangeable, they share the same 
shuffle behavior under the same upstream and downstream parallelism setting. 
This avoids the issues mentioned in issue2 here.
 * To address the issue that RESCALE might lead to imbalanced data on the 
downstream side, we introduced an improvement on the autoscaling algo side to 
make the upstream and downstream task parallelism to be multiples of each other.

> Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for 
> autoscaler and adaptive scheduler
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33123
>                 URL: https://issues.apache.org/jira/browse/FLINK-33123
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Runtime / Coordination
>    Affects Versions: 1.17.0, 1.18.0
>            Reporter: Zhanghao Chen
>            Priority: Critical
>         Attachments: image-2023-09-20-15-09-22-733.png, 
> image-2023-09-20-15-14-04-679.png
>
>
> *Background*
> https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is 
> wrong when the parallelism is changed for a vertex with a FORWARD edge, which 
> is used by both the autoscaler and adaptive scheduler where one can change 
> the vertex parallelism dynamically. Fix is applied to dynamically replace 
> partitioner from FORWARD to REBLANCE on task deployment in 
> {{{}StreamTask{}}}: 
>  
> !image-2023-09-20-15-09-22-733.png|width=560,height=221!
> *Problem*
> Unfortunately, the fix is still buggy in two aspects:
>  # The connections between upstream and downstream tasks are determined by 
> the distribution type of the partitioner when generating execution graph on 
> the JM side. When the edge is FORWARD, the distribution type is POINTWISE, 
> and Flink will try to evenly distribute subpartitions to all downstream 
> tasks. If one want to change it to REBALANCE, the distribution type has to be 
> changed to ALL_TO_ALL to make all-to-all connections between upstream and 
> downstream tasks. However, the fix did not change the distribution type which 
> makes the network connections be set up in a wrong way.
>  # The FOWARD partitioner will be replaced if 
> environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the 
> task parallelism. However, the number of subpartitions here equals to the 
> number of downstream tasks of this particular task, which is also determined 
> by the distribution type of the partitioner when generating execution graph 
> on the JM side.  When ceil(downstream task parallelism / upstream task 
> parallelism) = upstream task parallelism, we will have the number of 
> subpartitions = task parallelism. For example, for a topology A (parallelism 
> 2) -> B (parallelism 5), we will have 1 A task having 2 subpartitions, 1 A 
> task having 3 subpartition, and hence 1 task will have its number of 
> subpartitions equals to the task parallelism 2 and skip partitioner 
> replacement. As a result, that task will only send data to only one 
> downstream task as the FORWARD partitioner always send data to the first 
> subpartition. In fact, for a normal job with a FORWARD edge without any 
> autoscaling action, you will find that the partitioner is changed to 
> REBALANCE internally as the number of subpartitions always equals to 1 in 
> this case.
> !image-2023-09-20-15-14-04-679.png|width=892,height=301!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to